Hi everyone, Sorry I haven't got around to updating the KIP yet. Now that I've wrapped up KIP-989, I'm going to be working on 1035 starting today.
I'll update the KIP first, and then call a vote. Regards, Nick On Wed, 29 May 2024 at 07:25, Bruno Cadonna <cado...@apache.org> wrote: > Totally agree on moving forward and starting the VOTE! > > However, the KIP should be updated with the new info before starting the > VOTE. > > Best, > Bruno > > On 5/29/24 2:36 AM, Matthias J. Sax wrote: > > Sounds like a good plan. -- I think we are still wrapping up 3.8 > > release, but would also like to move forward with with one. > > > > Should we start a VOTE? > > > > For merging PRs we need to wait after code freeze, and 3.8 branch was > > but. But we could start reviewing PRs before this already. > > > > > > -Matthias > > > > On 5/17/24 3:05 AM, Nick Telford wrote: > >> Hi everyone, > >> > >> As discussed on the Zoom call, we're going to handle rebalance > >> meta-data by: > >> > >> - On start-up, Streams will open each store and read its changelog > >> offsets > >> into an in-memory cache. This cache will be shared among all > >> StreamThreads. > >> - On rebalance, the cache will be consulted for Task offsets for any > Task > >> that is not active on any instance-local StreamThreads. If the Task is > >> active on *any* instance-local StreamThread, we will report the Task > >> lag as > >> "up to date" (i.e. -1), because we know that the local state is > currently > >> up-to-date. > >> > >> We will avoid caching offsets across restarts in the legacy > ".checkpoint" > >> file, so that we can eliminate the logic for handling this class. If > >> performance of opening/closing many state stores is poor, we can > >> parallelise it by forking off a thread for each Task directory when > >> reading > >> the offsets. > >> > >> I'll update the KIP later today to reflect this design, but I will try > to > >> keep it high-level, so that the exact implementation can vary. > >> > >> Regards, > >> > >> Nick > >> > >> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman < > sop...@responsive.dev> > >> wrote: > >> > >>> 103: I like the idea of immediately deprecating #managesOffsets and > >>> aiming > >>> to make offset management mandatory in the long run. I assume we > >>> would also > >>> log a warning for any custom stores that return "false" from this > >>> method to > >>> encourage custom store implementations to start doing so? My only > >>> question/concern is that if we want folks to start managing their own > >>> offsets then we should make this transition easy for them, perhaps by > >>> exposing some public utility APIs for things that are currently > >>> handled by > >>> Kafka Streams such as reading/writing checkpoint files. Maybe it > >>> would be > >>> useful to include a small example in the KIP of what it would > >>> actually mean > >>> to "manage your own offsets" -- I know (all too well) that plugging in > >>> custom storage implementations is not easy and most people who do > >>> this are > >>> probably fairly advanced users, but offset management will be a > >>> totally new > >>> ballgame to most people people and this kind of feels like throwing > them > >>> off the deep end. We should at least provide a lifejacket via some > >>> kind of > >>> utility API and/or example > >>> > >>> 200. There's been a lot of back and forth on the rebalance > metadata/task > >>> lag computation question, so forgive me if I missed any part of this, > >>> but I > >>> think we've landed at the right idea here. To summarize: the "tl;dr" > >>> explanation is that we'll write the checkpoint file only on close and > >>> will > >>> account for hard-crash scenarios by opening up the stores on startup > and > >>> writing a checkpoint file for any missing tasks. Does that sound about > >>> right? > >>> > >>> A few clarifications: > >>> I think we're all more or less on the same page here but just to be > >>> absolutely clear, the task lags for each task directory found on disk > >>> will > >>> be reported by only one of the StreamThreads, and each StreamThread > will > >>> report lags only for tasks that it already owns or are not assigned > >>> to any > >>> other StreamThread in the client. In other words, we only need to get > >>> the > >>> task lag for completely unassigned/unlocked tasks, which means if > >>> there is > >>> a checkpoint file at all then it must be up-to-date, because there is > no > >>> other StreamThread actively writing to that state store (if so then > only > >>> that StreamThread would report lag for that particular task). > >>> > >>> This still leaves the "no checkpoint at all" case which as previously > >>> mentioned can occur after a hard-crash. Luckily we only have to worry > >>> about this once, after starting up again following said hard crash. > >>> We can > >>> simply open up each of the state stores before ever joining the > >>> group, get > >>> the offsets from rocksdb, and write them to a new checkpoint file. > After > >>> that, we can depend on the checkpoints written at close and won't > >>> have to > >>> open up any stores that aren't already assigned for the reasons laid > >>> out in > >>> the paragraph above. > >>> > >>> As for the specific mechanism and which thread-does-what, since there > >>> were > >>> some questions, this is how I'm imagining the process: > >>> > >>> 1. The general idea is that we simply go through each task > >>> directories > >>> with state but no checkpoint file and open the StateStore, call > >>> #committedOffset, and then write it to the checkpoint file. We > >>> can then > >>> close these stores and let things proceed as normal. > >>> 2. This only has to happen once, during startup, but we have two > >>> options: > >>> 1. Do this from KafkaStreams#start, ie before we even create the > >>> StreamThreads > >>> 2. Do this from StreamThread#start, following a similar > >>> lock-based > >>> approach to the one used #computeTaskLags, where each > >>> StreamThread > >>> just > >>> makes a pass over the task directories on disk and attempts to > >>> lock > >>> them > >>> one by one. If they obtain the lock, check whether there is > state > >>> but no > >>> checkpoint, and write the checkpoint if needed. If it can't grab > >>> the lock, > >>> then we know one of the other StreamThreads must be handling the > >>> checkpoint > >>> file for that task directory, and we can move on. > >>> > >>> Don't really feel too strongly about which approach is best, doing > >>> it in > >>> KafkaStreams#start is certainly the most simple while doing it in the > >>> StreamThread's startup is more efficient. If we're worried about > >>> adding too > >>> much weight to KafkaStreams#start then the 2nd option is probably best, > >>> though slightly more complicated. > >>> > >>> Thoughts? > >>> > >>> On Tue, May 14, 2024 at 10:02 AM Nick Telford <nick.telf...@gmail.com> > >>> wrote: > >>> > >>>> Hi everyone, > >>>> > >>>> Sorry for the delay in replying. I've finally now got some time to > work > >>> on > >>>> this. > >>>> > >>>> Addressing Matthias's comments: > >>>> > >>>> 100. > >>>> Good point. As Bruno mentioned, there's already > >>> AbstractReadWriteDecorator > >>>> which we could leverage to provide that protection. I'll add details > on > >>>> this to the KIP. > >>>> > >>>> 101,102. > >>>> It looks like these points have already been addressed by Bruno. Let > me > >>>> know if anything here is still unclear or you feel needs to be > detailed > >>>> more in the KIP. > >>>> > >>>> 103. > >>>> I'm in favour of anything that gets the old code removed sooner, but > >>>> wouldn't deprecating an API that we expect (some) users to implement > >>> cause > >>>> problems? > >>>> I'm thinking about implementers of custom StateStores, as they may be > >>>> confused by managesOffsets() being deprecated, especially since they > >>> would > >>>> have to mark their implementation as @Deprecated in order to avoid > >>> compile > >>>> warnings. > >>>> If deprecating an API *while it's still expected to be implemented* is > >>>> something that's generally done in the project, then I'm happy to do > so > >>>> here. > >>>> > >>>> 104. > >>>> I think this is technically possible, but at the cost of considerable > >>>> additional code to maintain. Would we ever have a pathway to remove > >>>> this > >>>> downgrade code in the future? > >>>> > >>>> > >>>> Regarding rebalance metadata: > >>>> Opening all stores on start-up to read and cache their offsets is an > >>>> interesting idea, especially if we can avoid re-opening the stores > once > >>> the > >>>> Tasks have been assigned. Scalability shouldn't be too much of a > >>>> problem, > >>>> because typically users have a fairly short state.cleanup.delay, so > the > >>>> number of on-disk Task directories should rarely exceed the number of > >>> Tasks > >>>> previously assigned to that instance. > >>>> An advantage of this approach is that it would also simplify > StateStore > >>>> implementations, as they would only need to guarantee that committed > >>>> offsets are available when the store is open. > >>>> > >>>> I'll investigate this approach this week for feasibility and report > >>>> back. > >>>> > >>>> I think that covers all the outstanding feedback, unless I missed > >>> anything? > >>>> > >>>> Regards, > >>>> Nick > >>>> > >>>> On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org> > wrote: > >>>> > >>>>> Hi Matthias, > >>>>> > >>>>> I see what you mean. > >>>>> > >>>>> To sum up: > >>>>> > >>>>> With this KIP the .checkpoint file is written when the store closes. > >>>>> That is when: > >>>>> 1. a task moves away from Kafka Streams client > >>>>> 2. Kafka Streams client shuts down > >>>>> > >>>>> A Kafka Streams client needs the information in the .checkpoint file > >>>>> 1. on startup because it does not have any open stores yet. > >>>>> 2. during rebalances for non-empty state directories of tasks that > are > >>>>> not assigned to the Kafka Streams client. > >>>>> > >>>>> With hard crashes, i.e., when the Streams client is not able to close > >>>>> its state stores and write the .checkpoint file, the .checkpoint file > >>>>> might be quite stale. That influences the next rebalance after > >>>>> failover > >>>>> negatively. > >>>>> > >>>>> > >>>>> My conclusion is that Kafka Streams either needs to open the state > >>>>> stores at start up or we write the checkpoint file more often. > >>>>> > >>>>> Writing the .checkpoint file during processing more often without > >>>>> controlling the flush to disk would work. However, Kafka Streams > would > >>>>> checkpoint offsets that are not yet persisted on disk by the state > >>>>> store. That is with a hard crash the offsets in the .checkpoint file > >>>>> might be larger than the offsets checkpointed in the state store. > That > >>>>> might not be a problem if Kafka Streams uses the .checkpoint file > only > >>>>> to compute the task lag. The downside is that it makes the managing > of > >>>>> checkpoints more complex because now we have to maintain two > >>>>> checkpoints: one for restoration and one for computing the task lag. > >>>>> I think we should explore the option where Kafka Streams opens the > >>> state > >>>>> stores at start up to get the offsets. > >>>>> > >>>>> I also checked when Kafka Streams needs the checkpointed offsets to > >>>>> compute the task lag during a rebalance. Turns out Kafka Streams > needs > >>>>> them before sending the join request. Now, I am wondering if opening > >>> the > >>>>> state stores of unassigned tasks whose state directory exists locally > >>> is > >>>>> actually such a big issue due to the expected higher latency since it > >>>>> happens actually before the Kafka Streams client joins the rebalance. > >>>>> > >>>>> Best, > >>>>> Bruno > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On 5/4/24 12:05 AM, Matthias J. Sax wrote: > >>>>>> That's good questions... I could think of a few approaches, but I > >>> admit > >>>>>> it might all be a little bit tricky to code up... > >>>>>> > >>>>>> However if we don't solve this problem, I think this KIP does not > >>>> really > >>>>>> solve the core issue we are facing? In the end, if we rely on the > >>>>>> `.checkpoint` file to compute a task assignment, but the > >>> `.checkpoint` > >>>>>> file can be arbitrary stale after a crash because we only write it > >>> on a > >>>>>> clean close, there would be still a huge gap that this KIP does not > >>>>> close? > >>>>>> > >>>>>> For the case in which we keep the checkpoint file, this KIP would > >>> still > >>>>>> help for "soft errors" in which KS can recover, and roll back the > >>>> store. > >>>>>> A significant win for sure. -- But hard crashes would still be an > >>>>>> problem? We might assign tasks to "wrong" instance, ie, which are > not > >>>>>> most up to date, as the checkpoint information could be very > >>> outdated? > >>>>>> Would we end up with a half-baked solution? Would this be good > enough > >>>> to > >>>>>> justify the introduced complexity? In the, for soft failures it's > >>> still > >>>>>> a win. Just want to make sure we understand the limitations and make > >>> an > >>>>>> educated decision. > >>>>>> > >>>>>> Or do I miss something? > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 5/3/24 10:20 AM, Bruno Cadonna wrote: > >>>>>>> Hi Matthias, > >>>>>>> > >>>>>>> > >>>>>>> 200: > >>>>>>> I like the idea in general. However, it is not clear to me how the > >>>>>>> behavior should be with multiple stream threads in the same Kafka > >>>>>>> Streams client. What stream thread opens which store? How can a > >>> stream > >>>>>>> thread pass an open store to another stream thread that got the > >>>>>>> corresponding task assigned? How does a stream thread know that a > >>> task > >>>>>>> was not assigned to any of the stream threads of the Kafka Streams > >>>>>>> client? I have the feeling we should just keep the .checkpoint file > >>> on > >>>>>>> close for now to unblock this KIP and try to find a solution to get > >>>>>>> totally rid of it later. > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 5/3/24 6:29 PM, Matthias J. Sax wrote: > >>>>>>>> 101: Yes, but what I am saying is, that we don't need to flush the > >>>>>>>> .position file to disk periodically, but only maintain it in main > >>>>>>>> memory, and only write it to disk on close() to preserve it across > >>>>>>>> restarts. This way, it would never be ahead, but might only lag? > >>> But > >>>>>>>> with my better understanding about (102) it might be mood > anyway... > >>>>>>>> > >>>>>>>> > >>>>>>>> 102: Thanks for clarifying. Looked into the code now. Makes sense. > >>>>>>>> Might be something to be worth calling out explicitly in the KIP > >>>>>>>> writeup. -- Now that I realize that the position is tracked inside > >>>>>>>> the store (not outside as the changelog offsets) it makes much > more > >>>>>>>> sense to pull position into RocksDB itself. In the end, it's > >>> actually > >>>>>>>> a "store implementation" detail how it tracks the position (and > >>> kinda > >>>>>>>> leaky abstraction currently, that we re-use the checkpoint file > >>>>>>>> mechanism to track it and flush to disk). > >>>>>>>> > >>>>>>>> > >>>>>>>> 200: I was thinking about this a little bit more, and maybe it's > >>> not > >>>>>>>> too bad? When KS starts up, we could upon all stores we find on > >>> local > >>>>>>>> disk pro-actively, and keep them all open until the first > rebalance > >>>>>>>> finishes: For tasks we get assigned, we hand in the already opened > >>>>>>>> store (this would amortize the cost to open the store before the > >>>>>>>> rebalance) and for non-assigned tasks, we know the offset > >>> information > >>>>>>>> won't change and we could just cache it in-memory for later reuse > >>>>>>>> (ie, next rebalance) and close the store to free up resources? -- > >>>>>>>> Assuming that we would get a large percentage of opened stores > >>>>>>>> assigned as tasks anyway, this could work? > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> On 5/3/24 1:29 AM, Bruno Cadonna wrote: > >>>>>>>>> Hi Matthias, > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 101: > >>>>>>>>> Let's assume a RocksDB store, but I think the following might be > >>>>>>>>> true also for other store implementations. With this KIP, if > Kafka > >>>>>>>>> Streams commits the offsets, the committed offsets will be stored > >>> in > >>>>>>>>> an in-memory data structure (i.e. the memtable) and stay there > >>> until > >>>>>>>>> RocksDB decides that it is time to persist its in-memory data > >>>>>>>>> structure. If Kafka Streams writes its position to the .position > >>>>>>>>> file during a commit and a crash happens before RocksDB persist > >>> the > >>>>>>>>> memtable then the position in the .position file is ahead of the > >>>>>>>>> persisted offset. If IQ is done between the crash and the state > >>>>>>>>> store fully restored the changelog, the position might tell IQ > >>> that > >>>>>>>>> the state store is more up-to-date than it actually is. > >>>>>>>>> In contrast, if Kafka Streams handles persisting positions the > >>> same > >>>>>>>>> as persisting offset, the position should always be consistent > >>> with > >>>>>>>>> the offset, because they are persisted together. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 102: > >>>>>>>>> I am confused about your confusion which tells me that we are > >>>>>>>>> talking about two different things. > >>>>>>>>> You asked > >>>>>>>>> > >>>>>>>>> "Do you intent to add this information [i.e. position] to the map > >>>>>>>>> passed via commit(final Map<TopicPartition, Long> > >>>> changelogOffsets)?" > >>>>>>>>> > >>>>>>>>> and with what I wrote I meant that we do not need to pass the > >>>>>>>>> position into the implementation of the StateStore interface > since > >>>>>>>>> the position is updated within the implementation of the > >>> StateStore > >>>>>>>>> interface (e.g. RocksDBStore [1]). My statement describes the > >>>>>>>>> behavior now, not the change proposed in this KIP, so it does not > >>>>>>>>> contradict what is stated in the KIP. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 200: > >>>>>>>>> This is about Matthias' main concern about rebalance metadata. > >>>>>>>>> As far as I understand the KIP, Kafka Streams will only use the > >>>>>>>>> .checkpoint files to compute the task lag for unassigned tasks > >>> whose > >>>>>>>>> state is locally available. For assigned tasks, it will use the > >>>>>>>>> offsets managed by the open state store. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Bruno > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>>> > >>>>> > >>>> > >>> > https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397 > >>>>>>>>> > >>>>>>>>> On 5/1/24 3:00 AM, Matthias J. Sax wrote: > >>>>>>>>>> Thanks Bruno. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 101: I think I understand this better now. But just want to make > >>>>>>>>>> sure I do. What do you mean by "they can diverge" and > "Recovering > >>>>>>>>>> after a failure might load inconsistent offsets and positions." > >>>>>>>>>> > >>>>>>>>>> The checkpoint is the offset from the changelog, while the > >>> position > >>>>>>>>>> is the offset from the upstream source topic, right? -- In the > >>> end, > >>>>>>>>>> the position is about IQ, and if we fail to update it, it only > >>>>>>>>>> means that there is some gap when we might not be able to query > a > >>>>>>>>>> standby task, because we think it's not up-to-date enough even > if > >>>>>>>>>> it is, which would resolve itself soon? Ie, the position might > >>>>>>>>>> "lag", but it's not "inconsistent". Do we believe that this lag > >>>>>>>>>> would be highly problematic? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 102: I am confused. > >>>>>>>>>> > >>>>>>>>>>> The position is maintained inside the state store, but is > >>>>>>>>>>> persisted in the .position file when the state store closes. > >>>>>>>>>> > >>>>>>>>>> This contradicts the KIP: > >>>>>>>>>> > >>>>>>>>>>> these position offsets will be stored in RocksDB, in the same > >>>>>>>>>>> column family as the changelog offsets, instead of the > .position > >>>>> file > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> My main concern is currently about rebalance metadata -- opening > >>>>>>>>>> RocksDB stores seems to be very expensive, but if we follow the > >>>> KIP: > >>>>>>>>>> > >>>>>>>>>>> We will do this under EOS by updating the .checkpoint file > >>>>>>>>>>> whenever a store is close()d. > >>>>>>>>>> > >>>>>>>>>> It seems, having the offset inside RocksDB does not help us at > >>> all? > >>>>>>>>>> In the end, when we crash, we don't want to lose the state, but > >>>>>>>>>> when we update the .checkpoint only on a clean close, the > >>>>>>>>>> .checkpoint might be stale (ie, still contains the checkpoint > >>> when > >>>>>>>>>> we opened the store when we got a task assigned). > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> On 4/30/24 2:40 AM, Bruno Cadonna wrote: > >>>>>>>>>>> Hi all, > >>>>>>>>>>> > >>>>>>>>>>> 100 > >>>>>>>>>>> I think we already have such a wrapper. It is called > >>>>>>>>>>> AbstractReadWriteDecorator. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 101 > >>>>>>>>>>> Currently, the position is checkpointed when a offset > checkpoint > >>>>>>>>>>> is written. If we let the state store manage the committed > >>>>>>>>>>> offsets, we need to also let the state store also manage the > >>>>>>>>>>> position otherwise they might diverge. State store managed > >>> offsets > >>>>>>>>>>> can get flushed (i.e. checkpointed) to the disk when the state > >>>>>>>>>>> store decides to flush its in-memory data structures, but the > >>>>>>>>>>> position is only checkpointed at commit time. Recovering after > a > >>>>>>>>>>> failure might load inconsistent offsets and positions. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 102 > >>>>>>>>>>> The position is maintained inside the state store, but is > >>>>>>>>>>> persisted in the .position file when the state store closes. > The > >>>>>>>>>>> only public interface that uses the position is IQv2 in a > >>>>>>>>>>> read-only mode. So the position is only updated within the > state > >>>>>>>>>>> store and read from IQv2. No need to add anything to the public > >>>>>>>>>>> StateStore interface. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 103 > >>>>>>>>>>> Deprecating managesOffsets() right away might be a good idea. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 104 > >>>>>>>>>>> I agree that we should try to support downgrades without wipes. > >>> At > >>>>>>>>>>> least Nick should state in the KIP why we do not support it. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Bruno > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 4/23/24 8:13 AM, Matthias J. Sax wrote: > >>>>>>>>>>>> Thanks for splitting out this KIP. The discussion shows, that > >>> it > >>>>>>>>>>>> is a complex beast by itself, so worth to discuss by its own. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Couple of question / comment: > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 100 `StateStore#commit()`: The JavaDoc says "must not be > called > >>>>>>>>>>>> by users" -- I would propose to put a guard in place for this, > >>> by > >>>>>>>>>>>> either throwing an exception (preferable) or adding a no-op > >>>>>>>>>>>> implementation (at least for our own stores, by wrapping them > >>> -- > >>>>>>>>>>>> we cannot enforce it for custom stores I assume), and document > >>>>>>>>>>>> this contract explicitly. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 101 adding `.position` to the store: Why do we actually need > >>>>>>>>>>>> this? The KIP says "To ensure consistency with the committed > >>> data > >>>>>>>>>>>> and changelog offsets" but I am not sure if I can follow? Can > >>> you > >>>>>>>>>>>> elaborate why leaving the `.position` file as-is won't work? > >>>>>>>>>>>> > >>>>>>>>>>>>> If it's possible at all, it will need to be done by > >>>>>>>>>>>>> creating temporary StateManagers and StateStores during > >>>>>>>>>>>>> rebalance. I think > >>>>>>>>>>>>> it is possible, and probably not too expensive, but the devil > >>>>>>>>>>>>> will be in > >>>>>>>>>>>>> the detail. > >>>>>>>>>>>> > >>>>>>>>>>>> This sounds like a significant overhead to me. We know that > >>>>>>>>>>>> opening a single RocksDB takes about 500ms, and thus opening > >>>>>>>>>>>> RocksDB to get this information might slow down rebalances > >>>>>>>>>>>> significantly. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 102: It's unclear to me, how `.position` information is added. > >>>>>>>>>>>> The KIP only says: "position offsets will be stored in > RocksDB, > >>>>>>>>>>>> in the same column family as the changelog offsets". Do you > >>>>>>>>>>>> intent to add this information to the map passed via > >>>>>>>>>>>> `commit(final Map<TopicPartition, Long> changelogOffsets)`? > The > >>>>>>>>>>>> KIP should describe this in more detail. Also, if my > assumption > >>>>>>>>>>>> is correct, we might want to rename the parameter and also > >>> have a > >>>>>>>>>>>> better JavaDoc description? > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 103: Should we make it mandatory (long-term) that all stores > >>>>>>>>>>>> (including custom stores) manage their offsets internally? > >>>>>>>>>>>> Maintaining both options and thus both code paths puts a > burden > >>>>>>>>>>>> on everyone and make the code messy. I would strongly prefer > if > >>>>>>>>>>>> we could have mid-term path to get rid of supporting both. -- > >>>>>>>>>>>> For this case, we should deprecate the newly added > >>>>>>>>>>>> `managesOffsets()` method right away, to point out that we > >>> intend > >>>>>>>>>>>> to remove it. If it's mandatory to maintain offsets for > stores, > >>>>>>>>>>>> we won't need this method any longer. In memory stores can > just > >>>>>>>>>>>> return null from #committedOffset(). > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 104 "downgrading": I think it might be worth to add support > for > >>>>>>>>>>>> downgrading w/o the need to wipe stores? Leveraging > >>>>>>>>>>>> `upgrade.from` parameter, we could build a two rolling bounce > >>>>>>>>>>>> downgrade: (1) the new code is started with `upgrade.from` set > >>> to > >>>>>>>>>>>> a lower version, telling the runtime to do the cleanup on > >>>>>>>>>>>> `close()` -- (ie, ensure that all data is written into > >>>>>>>>>>>> `.checkpoint` and `.position` file, and the newly added CL is > >>>>>>>>>>>> deleted). In a second, rolling bounce, the old code would be > >>> able > >>>>>>>>>>>> to open RocksDB. -- I understand that this implies much more > >>>>>>>>>>>> work, but downgrade seems to be common enough, that it might > be > >>>>>>>>>>>> worth it? Even if we did not always support this in the past, > >>> we > >>>>>>>>>>>> have the face the fact that KS is getting more and more > adopted > >>>>>>>>>>>> and as a more mature product should support this? > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 4/21/24 11:58 PM, Bruno Cadonna wrote: > >>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>> > >>>>>>>>>>>>> How should we proceed here? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. with the plain .checkpoint file > >>>>>>>>>>>>> 2. with a way to use the state store interface on unassigned > >>> but > >>>>>>>>>>>>> locally existing task state > >>>>>>>>>>>>> > >>>>>>>>>>>>> While I like option 2, I think option 1 is less risky and > will > >>>>>>>>>>>>> give us the benefits of transactional state stores sooner. We > >>>>>>>>>>>>> should consider the interface approach afterwards, though. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Bruno > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 4/17/24 3:15 PM, Bruno Cadonna wrote: > >>>>>>>>>>>>>> Hi Nick and Sophie, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I think the task ID is not enough to create a state store > >>> that > >>>>>>>>>>>>>> can read the offsets of non-assigned tasks for lag > >>> computation > >>>>>>>>>>>>>> during rebalancing. The state store also needs the state > >>>>>>>>>>>>>> directory so that it knows where to find the information > that > >>>>>>>>>>>>>> it needs to return from changelogOffsets(). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In general, I think we should proceed with the plain > >>>>>>>>>>>>>> .checkpoint file for now and iterate back to the state store > >>>>>>>>>>>>>> solution later since it seems it is not that > straightforward. > >>>>>>>>>>>>>> Alternatively, Nick could timebox an effort to better > >>>>>>>>>>>>>> understand what would be needed for the state store > solution. > >>>>>>>>>>>>>> Nick, let us know your decision. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regarding your question about the state store instance. I am > >>>>>>>>>>>>>> not too familiar with that part of the code, but I think the > >>>>>>>>>>>>>> state store is build when the processor topology is build > and > >>>>>>>>>>>>>> the processor topology is build per stream task. So there is > >>>>>>>>>>>>>> one instance of processor topology and state store per > stream > >>>>>>>>>>>>>> task. Try to follow the call in [1]. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>> > >>>>> > >>>> > >>> > https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 4/16/24 8:59 PM, Nick Telford wrote: > >>>>>>>>>>>>>>> That does make sense. The one thing I can't figure out is > >>> how > >>>>>>>>>>>>>>> per-Task > >>>>>>>>>>>>>>> StateStore instances are constructed. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> It looks like we construct one StateStore instance for the > >>>>>>>>>>>>>>> whole Topology > >>>>>>>>>>>>>>> (in InternalTopologyBuilder), and pass that into > >>>>>>>>>>>>>>> ProcessorStateManager (via > >>>>>>>>>>>>>>> StateManagerUtil) for each Task, which then initializes it. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> This can't be the case though, otherwise multiple > partitions > >>>>>>>>>>>>>>> of the same > >>>>>>>>>>>>>>> sub-topology (aka Tasks) would share the same StateStore > >>>>>>>>>>>>>>> instance, which > >>>>>>>>>>>>>>> they don't. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> What am I missing? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman > >>>>>>>>>>>>>>> <sop...@responsive.dev> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I don't think we need to *require* a constructor accept > the > >>>>>>>>>>>>>>>> TaskId, but we > >>>>>>>>>>>>>>>> would definitely make sure that the RocksDB state store > >>>>>>>>>>>>>>>> changes its > >>>>>>>>>>>>>>>> constructor to one that accepts the TaskID (which we can > do > >>>>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>> deprecation since its an internal API), and custom state > >>>>>>>>>>>>>>>> stores can just > >>>>>>>>>>>>>>>> decide for themselves whether they want to opt-in/use the > >>>>>>>>>>>>>>>> TaskId param > >>>>>>>>>>>>>>>> or not. I mean custom state stores would have to opt-in > >>>>>>>>>>>>>>>> anyways by > >>>>>>>>>>>>>>>> implementing the new StoreSupplier#get(TaskId) API and the > >>>> only > >>>>>>>>>>>>>>>> reason to do that would be to have created a constructor > >>> that > >>>>>>>>>>>>>>>> accepts > >>>>>>>>>>>>>>>> a TaskId > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Just to be super clear about the proposal, this is what I > >>> had > >>>>>>>>>>>>>>>> in mind. > >>>>>>>>>>>>>>>> It's actually fairly simple and wouldn't add much to the > >>>>>>>>>>>>>>>> scope of the > >>>>>>>>>>>>>>>> KIP (I think -- if it turns out to be more complicated > than > >>>>>>>>>>>>>>>> I'm assuming, > >>>>>>>>>>>>>>>> we should definitely do whatever has the smallest LOE to > >>> get > >>>>>>>>>>>>>>>> this done > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Anyways, the (only) public API changes would be to add > this > >>>> new > >>>>>>>>>>>>>>>> method to the StoreSupplier API: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> default T get(final TaskId taskId) { > >>>>>>>>>>>>>>>> return get(); > >>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> We can decide whether or not to deprecate the old #get but > >>>>>>>>>>>>>>>> it's not > >>>>>>>>>>>>>>>> really necessary and might cause a lot of turmoil, so I'd > >>>>>>>>>>>>>>>> personally > >>>>>>>>>>>>>>>> say we just leave both APIs in place. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> And that's it for public API changes! Internally, we would > >>>>>>>>>>>>>>>> just adapt > >>>>>>>>>>>>>>>> each of the rocksdb StoreSupplier classes to implement > this > >>>> new > >>>>>>>>>>>>>>>> API. So for example with the > >>>> RocksDBKeyValueBytesStoreSupplier, > >>>>>>>>>>>>>>>> we just add > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> @Override > >>>>>>>>>>>>>>>> public KeyValueStore<Bytes, byte[]> get(final TaskId > >>> taskId) > >>>> { > >>>>>>>>>>>>>>>> return returnTimestampedStore ? > >>>>>>>>>>>>>>>> new RocksDBTimestampedStore(name, > metricsScope(), > >>>>>>>>>>>>>>>> taskId) : > >>>>>>>>>>>>>>>> new RocksDBStore(name, metricsScope(), taskId); > >>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> And of course add the TaskId parameter to each of the > >>> actual > >>>>>>>>>>>>>>>> state store constructors returned here. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Does that make sense? It's entirely possible I'm missing > >>>>>>>>>>>>>>>> something > >>>>>>>>>>>>>>>> important here, but I think this would be a pretty small > >>>>>>>>>>>>>>>> addition that > >>>>>>>>>>>>>>>> would solve the problem you mentioned earlier while also > >>>> being > >>>>>>>>>>>>>>>> useful to anyone who uses custom state stores. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford > >>>>>>>>>>>>>>>> <nick.telf...@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Sophie, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Interesting idea! Although what would that mean for the > >>>>>>>>>>>>>>>>> StateStore > >>>>>>>>>>>>>>>>> interface? Obviously we can't require that the > constructor > >>>>>>>>>>>>>>>>> take the > >>>>>>>>>>>>>>>> TaskId. > >>>>>>>>>>>>>>>>> Is it enough to add the parameter to the StoreSupplier? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Would doing this be in-scope for this KIP, or are we > >>>>>>>>>>>>>>>>> over-complicating > >>>>>>>>>>>>>>>> it? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman > >>>>>>>>>>>>>>>>> <sop...@responsive.dev > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Somewhat minor point overall, but it actually drives me > >>>>>>>>>>>>>>>>>> crazy that you > >>>>>>>>>>>>>>>>>> can't get access to the taskId of a StateStore until > >>> #init > >>>>>>>>>>>>>>>>>> is called. > >>>>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>> has caused me a huge headache personally (since the same > >>> is > >>>>>>>>>>>>>>>>>> true for > >>>>>>>>>>>>>>>>>> processors and I was trying to do something that's > >>> probably > >>>>>>>>>>>>>>>>>> too hacky > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> actually complain about here lol) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Can we just change the StateStoreSupplier to receive and > >>>>>>>>>>>>>>>>>> pass along the > >>>>>>>>>>>>>>>>>> taskId when creating a new store? Presumably by adding a > >>>>>>>>>>>>>>>>>> new version of > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> #get method that takes in a taskId parameter? We can > have > >>>>>>>>>>>>>>>>>> it default to > >>>>>>>>>>>>>>>>>> invoking the old one for compatibility reasons and it > >>>>>>>>>>>>>>>>>> should be > >>>>>>>>>>>>>>>>> completely > >>>>>>>>>>>>>>>>>> safe to tack on. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Would also prefer the same for a ProcessorSupplier, but > >>>>> that's > >>>>>>>>>>>>>>>> definitely > >>>>>>>>>>>>>>>>>> outside the scope of this KIP > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford > >>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On further thought, it's clear that this can't work for > >>>>>>>>>>>>>>>>>>> one simple > >>>>>>>>>>>>>>>>>> reason: > >>>>>>>>>>>>>>>>>>> StateStores don't know their associated TaskId (and > >>> hence, > >>>>>>>>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>>> StateDirectory) until the init() call. Therefore, > >>>>>>>>>>>>>>>>>>> committedOffset() > >>>>>>>>>>>>>>>>> can't > >>>>>>>>>>>>>>>>>>> be called before init(), unless we also added a > >>>>>>>>>>>>>>>>>>> StateStoreContext > >>>>>>>>>>>>>>>>>> argument > >>>>>>>>>>>>>>>>>>> to committedOffset(), which I think might be trying to > >>>>>>>>>>>>>>>>>>> shoehorn too > >>>>>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>>> into committedOffset(). > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I still don't like the idea of the Streams engine > >>>>>>>>>>>>>>>>>>> maintaining the > >>>>>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>> changelog offsets independently of stores, mostly > >>> because > >>>>>>>>>>>>>>>>>>> of the > >>>>>>>>>>>>>>>>>>> maintenance burden of the code duplication, but it > looks > >>>>>>>>>>>>>>>>>>> like we'll > >>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> live with it. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Unless you have any better ideas? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 14:12, Nick Telford > >>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Bruno, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Immediately after I sent my response, I looked at the > >>>>>>>>>>>>>>>>>>>> codebase and > >>>>>>>>>>>>>>>>> came > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> the same conclusion. If it's possible at all, it will > >>>>>>>>>>>>>>>>>>>> need to be > >>>>>>>>>>>>>>>> done > >>>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>> creating temporary StateManagers and StateStores > during > >>>>>>>>>>>>>>>>>>>> rebalance. > >>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>> it is possible, and probably not too expensive, but > the > >>>>>>>>>>>>>>>>>>>> devil will > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> the detail. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I'll try to find some time to explore the idea to see > >>> if > >>>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> report back, because we'll need to determine this > >>> before > >>>>>>>>>>>>>>>>>>>> we can > >>>>>>>>>>>>>>>> vote > >>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> KIP. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna > >>>>>>>>>>>>>>>>>>>> <cado...@apache.org> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for reacting on my comments so quickly! > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>>>>>> Some thoughts on your proposal. > >>>>>>>>>>>>>>>>>>>>> State managers (and state stores) are parts of tasks. > >>> If > >>>>>>>>>>>>>>>>>>>>> the task > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>> assigned locally, we do not create those tasks. To > get > >>>>>>>>>>>>>>>>>>>>> the offsets > >>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>> your approach, we would need to either create kind of > >>>>>>>>>>>>>>>>>>>>> inactive > >>>>>>>>>>>>>>>> tasks > >>>>>>>>>>>>>>>>>>>>> besides active and standby tasks or store and manage > >>>> state > >>>>>>>>>>>>>>>> managers > >>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>> non-assigned tasks differently than the state > managers > >>>>>>>>>>>>>>>>>>>>> of assigned > >>>>>>>>>>>>>>>>>>>>> tasks. Additionally, the cleanup thread that removes > >>>>>>>>>>>>>>>>>>>>> unassigned > >>>>>>>>>>>>>>>> task > >>>>>>>>>>>>>>>>>>>>> directories needs to concurrently delete those > >>> inactive > >>>>>>>>>>>>>>>>>>>>> tasks or > >>>>>>>>>>>>>>>>>>>>> task-less state managers of unassigned tasks. This > >>> seems > >>>>>>>>>>>>>>>>>>>>> all quite > >>>>>>>>>>>>>>>>>> messy > >>>>>>>>>>>>>>>>>>>>> to me. > >>>>>>>>>>>>>>>>>>>>> Could we create those state managers (or state > stores) > >>>>>>>>>>>>>>>>>>>>> for locally > >>>>>>>>>>>>>>>>>>>>> existing but unassigned tasks on demand when > >>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() is executed? Or have > a > >>>>>>>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>> encapsulation for the unused task directories? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 4/10/24 11:31 AM, Nick Telford wrote: > >>>>>>>>>>>>>>>>>>>>>> Hi Bruno, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the review! > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 1, 4, 5. > >>>>>>>>>>>>>>>>>>>>>> Done > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>>>>>>> You're right. I've removed the offending paragraph. > I > >>>> had > >>>>>>>>>>>>>>>>> originally > >>>>>>>>>>>>>>>>>>>>>> adapted this from the guarantees outlined in > KIP-892. > >>>>>>>>>>>>>>>>>>>>>> But it's > >>>>>>>>>>>>>>>>>>>>> difficult to > >>>>>>>>>>>>>>>>>>>>>> provide these guarantees without the KIP-892 > >>>> transaction > >>>>>>>>>>>>>>>> buffers. > >>>>>>>>>>>>>>>>>>>>> Instead, > >>>>>>>>>>>>>>>>>>>>>> we'll add the guarantees back into the JavaDoc when > >>>>>>>>>>>>>>>>>>>>>> KIP-892 > >>>>>>>>>>>>>>>> lands. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>>>>>>> Good point! This is the only part of the KIP that > was > >>>>>>>>>>>>>>>>>> (significantly) > >>>>>>>>>>>>>>>>>>>>>> changed when I extracted it from KIP-892. My > >>> prototype > >>>>>>>>>>>>>>>>>>>>>> currently > >>>>>>>>>>>>>>>>>>>>> maintains > >>>>>>>>>>>>>>>>>>>>>> this "cache" of changelog offsets in .checkpoint, > but > >>>>>>>>>>>>>>>>>>>>>> doing so > >>>>>>>>>>>>>>>>>> becomes > >>>>>>>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>> messy. My intent with this change was to try to > >>> better > >>>>>>>>>>>>>>>> encapsulate > >>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>> offset "caching", especially for StateStores that > can > >>>>>>>>>>>>>>>>>>>>>> cheaply > >>>>>>>>>>>>>>>>>> provide > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> offsets stored directly in them without needing to > >>>>>>>>>>>>>>>>>>>>>> duplicate > >>>>>>>>>>>>>>>> them > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>> cache. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> It's clear some more work is needed here to better > >>>>>>>>>>>>>>>>>>>>>> encapsulate > >>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>> My > >>>>>>>>>>>>>>>>>>>>>> immediate thought is: what if we construct *but > don't > >>>>>>>>>>>>>>>> initialize* > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> StateManager and StateStores for every Task > directory > >>>>>>>>>>>>>>>>>>>>>> on-disk? > >>>>>>>>>>>>>>>>> That > >>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>> still be quite cheap to do, and would enable us to > >>>>>>>>>>>>>>>>>>>>>> query the > >>>>>>>>>>>>>>>>> offsets > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>> all on-disk stores, even if they're not open. If the > >>>>>>>>>>>>>>>> StateManager > >>>>>>>>>>>>>>>>>>> (aka. > >>>>>>>>>>>>>>>>>>>>>> ProcessorStateManager/GlobalStateManager) proves too > >>>>>>>>>>>>>>>>>>>>>> expensive > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> hold > >>>>>>>>>>>>>>>>>>>>> open > >>>>>>>>>>>>>>>>>>>>>> for closed stores, we could always have a > >>>>>>>>>>>>>>>>>>>>>> "StubStateManager" in > >>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>>> place, > >>>>>>>>>>>>>>>>>>>>>> that enables the querying of offsets, but nothing > >>> else? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> IDK, what do you think? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna > >>>>>>>>>>>>>>>>>>>>>> <cado...@apache.org> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for breaking out the KIP from KIP-892! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Here a couple of comments/questions: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>>>>>>>>>>> In Kafka Streams, we have a design guideline which > >>>>>>>>>>>>>>>>>>>>>>> says to not > >>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> "get"-prefix for getters on the public API. Could > >>> you > >>>>>>>>>>>>>>>>>>>>>>> please > >>>>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>>>>>> getCommittedOffsets() to committedOffsets()? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>>>>>>>> It is not clear to me how > >>>>> TaskManager#getTaskOffsetSums() > >>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>> read > >>>>>>>>>>>>>>>>>>>>>>> offsets of tasks the stream thread does not own but > >>>>>>>>>>>>>>>>>>>>>>> that have a > >>>>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>>>>>>> directory on the Streams client by calling > >>>>>>>>>>>>>>>>>>>>>>> StateStore#getCommittedOffsets(). If the thread > does > >>>>>>>>>>>>>>>>>>>>>>> not own a > >>>>>>>>>>>>>>>>> task > >>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>> does also not create any state stores for the task, > >>>>>>>>>>>>>>>>>>>>>>> which means > >>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>> no state store on which to call > >>> getCommittedOffsets(). > >>>>>>>>>>>>>>>>>>>>>>> I would have rather expected that a checkpoint file > >>> is > >>>>>>>>>>>>>>>>>>>>>>> written > >>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>>> state stores on close -- not only for the > >>> RocksDBStore > >>>>>>>>>>>>>>>>>>>>>>> -- and > >>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file is read in > >>>>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() for > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> tasks > >>>>>>>>>>>>>>>>>>>>>>> that have a state directory on the client but are > >>> not > >>>>>>>>>>>>>>>>>>>>>>> currently > >>>>>>>>>>>>>>>>>>>>> assigned > >>>>>>>>>>>>>>>>>>>>>>> to any stream thread of the Streams client. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>>>>>>>> In the javadocs for commit() you write > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> "... all writes since the last commit(Map), or > since > >>>>>>>>>>>>>>>>>> init(StateStore) > >>>>>>>>>>>>>>>>>>>>>>> *MUST* be available to readers, even after a > >>> restart." > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> This is only true for a clean close before the > >>>>>>>>>>>>>>>>>>>>>>> restart, isn't > >>>>>>>>>>>>>>>> it? > >>>>>>>>>>>>>>>>>>>>>>> If the task fails with a dirty close, Kafka Streams > >>>>>>>>>>>>>>>>>>>>>>> cannot > >>>>>>>>>>>>>>>>>> guarantee > >>>>>>>>>>>>>>>>>>>>>>> that the in-memory structures of the state store > >>> (e.g. > >>>>>>>>>>>>>>>>>>>>>>> memtable > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> case of RocksDB) are flushed so that the records > and > >>>> the > >>>>>>>>>>>>>>>>> committed > >>>>>>>>>>>>>>>>>>>>>>> offsets are persisted. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 4. > >>>>>>>>>>>>>>>>>>>>>>> The wrapper that provides the legacy checkpointing > >>>>>>>>>>>>>>>>>>>>>>> behavior is > >>>>>>>>>>>>>>>>>>> actually > >>>>>>>>>>>>>>>>>>>>>>> an implementation detail. I would remove it from > the > >>>>>>>>>>>>>>>>>>>>>>> KIP, but > >>>>>>>>>>>>>>>>> still > >>>>>>>>>>>>>>>>>>>>>>> state that the legacy checkpointing behavior will > be > >>>>>>>>>>>>>>>>>>>>>>> supported > >>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> state store does not manage the checkpoints. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 5. > >>>>>>>>>>>>>>>>>>>>>>> Regarding the metrics, could you please add the > >>> tags, > >>>>>>>>>>>>>>>>>>>>>>> and the > >>>>>>>>>>>>>>>>>>> recording > >>>>>>>>>>>>>>>>>>>>>>> level (DEBUG or INFO) as done in KIP-607 or > KIP-444. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On 4/7/24 5:35 PM, Nick Telford wrote: > >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Based on some offline discussion, I've split out > >>> the > >>>>>>>>>>>>>>>>>>>>>>>> "Atomic > >>>>>>>>>>>>>>>>>>>>>>> Checkpointing" > >>>>>>>>>>>>>>>>>>>>>>>> section from KIP-892: Transactional Semantics for > >>>>>>>>>>>>>>>>>>>>>>>> StateStores, > >>>>>>>>>>>>>>>>>> into > >>>>>>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>>>>> own > >>>>>>>>>>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> KIP-1035: StateStore managed changelog offsets > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> While KIP-892 was adopted *with* the changes > >>> outlined > >>>>> in > >>>>>>>>>>>>>>>>> KIP-1035, > >>>>>>>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>>>>> changes were always the most contentious part, and > >>>>>>>>>>>>>>>>>>>>>>>> continued > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> spur > >>>>>>>>>>>>>>>>>>>>>>>> discussion even after KIP-892 was adopted. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> All the changes introduced in KIP-1035 have been > >>>>>>>>>>>>>>>>>>>>>>>> removed from > >>>>>>>>>>>>>>>>>>> KIP-892, > >>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> a hard dependency on KIP-1035 has been added to > >>>>>>>>>>>>>>>>>>>>>>>> KIP-892 in > >>>>>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>>>>> place. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I'm hopeful that with some more focus on this set > >>> of > >>>>>>>>>>>>>>>>>>>>>>>> changes, > >>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>> deliver something that we're all happy with. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>> > >>>> > >>> > >> >