Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-06 Thread Bruno Cadonna
Hi Matthias, I see what you mean. To sum up: With this KIP the .checkpoint file is written when the store closes. That is when: 1. a task moves away from Kafka Streams client 2. Kafka Streams client shuts down A Kafka Streams client needs the information in the .checkpoint file 1. on

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
That's good questions... I could think of a few approaches, but I admit it might all be a little bit tricky to code up... However if we don't solve this problem, I think this KIP does not really solve the core issue we are facing? In the end, if we rely on the `.checkpoint` file to compute a

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna
Hi Matthias, 200: I like the idea in general. However, it is not clear to me how the behavior should be with multiple stream threads in the same Kafka Streams client. What stream thread opens which store? How can a stream thread pass an open store to another stream thread that got the

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
101: Yes, but what I am saying is, that we don't need to flush the .position file to disk periodically, but only maintain it in main memory, and only write it to disk on close() to preserve it across restarts. This way, it would never be ahead, but might only lag? But with my better

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna
Hi Matthias, 101: Let's assume a RocksDB store, but I think the following might be true also for other store implementations. With this KIP, if Kafka Streams commits the offsets, the committed offsets will be stored in an in-memory data structure (i.e. the memtable) and stay there until

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Matthias J. Sax
Thanks Bruno. 101: I think I understand this better now. But just want to make sure I do. What do you mean by "they can diverge" and "Recovering after a failure might load inconsistent offsets and positions." The checkpoint is the offset from the changelog, while the position is the

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Bruno Cadonna
Hi all, 100 I think we already have such a wrapper. It is called AbstractReadWriteDecorator. 101 Currently, the position is checkpointed when a offset checkpoint is written. If we let the state store manage the committed offsets, we need to also let the state store also manage the position

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-23 Thread Matthias J. Sax
Thanks for splitting out this KIP. The discussion shows, that it is a complex beast by itself, so worth to discuss by its own. Couple of question / comment: 100 `StateStore#commit()`: The JavaDoc says "must not be called by users" -- I would propose to put a guard in place for this, by

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-22 Thread Bruno Cadonna
Hi all, How should we proceed here? 1. with the plain .checkpoint file 2. with a way to use the state store interface on unassigned but locally existing task state While I like option 2, I think option 1 is less risky and will give us the benefits of transactional state stores sooner. We

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-17 Thread Bruno Cadonna
Hi Nick and Sophie, I think the task ID is not enough to create a state store that can read the offsets of non-assigned tasks for lag computation during rebalancing. The state store also needs the state directory so that it knows where to find the information that it needs to return from

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-16 Thread Nick Telford
That does make sense. The one thing I can't figure out is how per-Task StateStore instances are constructed. It looks like we construct one StateStore instance for the whole Topology (in InternalTopologyBuilder), and pass that into ProcessorStateManager (via StateManagerUtil) for each Task, which

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-16 Thread Sophie Blee-Goldman
I don't think we need to *require* a constructor accept the TaskId, but we would definitely make sure that the RocksDB state store changes its constructor to one that accepts the TaskID (which we can do without deprecation since its an internal API), and custom state stores can just decide for

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-15 Thread Nick Telford
Hi Sophie, Interesting idea! Although what would that mean for the StateStore interface? Obviously we can't require that the constructor take the TaskId. Is it enough to add the parameter to the StoreSupplier? Would doing this be in-scope for this KIP, or are we over-complicating it? Nick On

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-12 Thread Sophie Blee-Goldman
Somewhat minor point overall, but it actually drives me crazy that you can't get access to the taskId of a StateStore until #init is called. This has caused me a huge headache personally (since the same is true for processors and I was trying to do something that's probably too hacky to actually

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-12 Thread Nick Telford
On further thought, it's clear that this can't work for one simple reason: StateStores don't know their associated TaskId (and hence, their StateDirectory) until the init() call. Therefore, committedOffset() can't be called before init(), unless we also added a StateStoreContext argument to

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Nick Telford
Hi Bruno, Immediately after I sent my response, I looked at the codebase and came to the same conclusion. If it's possible at all, it will need to be done by creating temporary StateManagers and StateStores during rebalance. I think it is possible, and probably not too expensive, but the devil

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Bruno Cadonna
Hi Nick, Thanks for reacting on my comments so quickly! 2. Some thoughts on your proposal. State managers (and state stores) are parts of tasks. If the task is not assigned locally, we do not create those tasks. To get the offsets with your approach, we would need to either create kind of

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Nick Telford
Hi Bruno, Thanks for the review! 1, 4, 5. Done 3. You're right. I've removed the offending paragraph. I had originally adapted this from the guarantees outlined in KIP-892. But it's difficult to provide these guarantees without the KIP-892 transaction buffers. Instead, we'll add the guarantees

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-09 Thread Bruno Cadonna
Hi Nick, Thanks for breaking out the KIP from KIP-892! Here a couple of comments/questions: 1. In Kafka Streams, we have a design guideline which says to not use the "get"-prefix for getters on the public API. Could you please change getCommittedOffsets() to committedOffsets()? 2. It is

[DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-07 Thread Nick Telford
Hi everyone, Based on some offline discussion, I've split out the "Atomic Checkpointing" section from KIP-892: Transactional Semantics for StateStores, into its own KIP KIP-1035: StateStore managed changelog offsets