Hi Matthias,
I see what you mean.
To sum up:
With this KIP the .checkpoint file is written when the store closes.
That is when:
1. a task moves away from Kafka Streams client
2. Kafka Streams client shuts down
A Kafka Streams client needs the information in the .checkpoint file
1. on
That's good questions... I could think of a few approaches, but I admit
it might all be a little bit tricky to code up...
However if we don't solve this problem, I think this KIP does not really
solve the core issue we are facing? In the end, if we rely on the
`.checkpoint` file to compute a
Hi Matthias,
200:
I like the idea in general. However, it is not clear to me how the
behavior should be with multiple stream threads in the same Kafka
Streams client. What stream thread opens which store? How can a stream
thread pass an open store to another stream thread that got the
101: Yes, but what I am saying is, that we don't need to flush the
.position file to disk periodically, but only maintain it in main
memory, and only write it to disk on close() to preserve it across
restarts. This way, it would never be ahead, but might only lag? But
with my better
Hi Matthias,
101:
Let's assume a RocksDB store, but I think the following might be true
also for other store implementations. With this KIP, if Kafka Streams
commits the offsets, the committed offsets will be stored in an
in-memory data structure (i.e. the memtable) and stay there until
Thanks Bruno.
101: I think I understand this better now. But just want to make sure I
do. What do you mean by "they can diverge" and "Recovering after a
failure might load inconsistent offsets and positions."
The checkpoint is the offset from the changelog, while the position is
the
Hi all,
100
I think we already have such a wrapper. It is called
AbstractReadWriteDecorator.
101
Currently, the position is checkpointed when a offset checkpoint is
written. If we let the state store manage the committed offsets, we need
to also let the state store also manage the position
Thanks for splitting out this KIP. The discussion shows, that it is a
complex beast by itself, so worth to discuss by its own.
Couple of question / comment:
100 `StateStore#commit()`: The JavaDoc says "must not be called by
users" -- I would propose to put a guard in place for this, by
Hi all,
How should we proceed here?
1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally
existing task state
While I like option 2, I think option 1 is less risky and will give us
the benefits of transactional state stores sooner. We
Hi Nick and Sophie,
I think the task ID is not enough to create a state store that can read
the offsets of non-assigned tasks for lag computation during
rebalancing. The state store also needs the state directory so that it
knows where to find the information that it needs to return from
That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.
It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager (via
StateManagerUtil) for each Task, which
I don't think we need to *require* a constructor accept the TaskId, but we
would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for
Hi Sophie,
Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the TaskId.
Is it enough to add the parameter to the StoreSupplier?
Would doing this be in-scope for this KIP, or are we over-complicating it?
Nick
On
Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called. This
has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky to
actually
On further thought, it's clear that this can't work for one simple reason:
StateStores don't know their associated TaskId (and hence, their
StateDirectory) until the init() call. Therefore, committedOffset() can't
be called before init(), unless we also added a StateStoreContext argument
to
Hi Bruno,
Immediately after I sent my response, I looked at the codebase and came to
the same conclusion. If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I think
it is possible, and probably not too expensive, but the devil
Hi Nick,
Thanks for reacting on my comments so quickly!
2.
Some thoughts on your proposal.
State managers (and state stores) are parts of tasks. If the task is not
assigned locally, we do not create those tasks. To get the offsets with
your approach, we would need to either create kind of
Hi Bruno,
Thanks for the review!
1, 4, 5.
Done
3.
You're right. I've removed the offending paragraph. I had originally
adapted this from the guarantees outlined in KIP-892. But it's difficult to
provide these guarantees without the KIP-892 transaction buffers. Instead,
we'll add the guarantees
Hi Nick,
Thanks for breaking out the KIP from KIP-892!
Here a couple of comments/questions:
1.
In Kafka Streams, we have a design guideline which says to not use the
"get"-prefix for getters on the public API. Could you please change
getCommittedOffsets() to committedOffsets()?
2.
It is
Hi everyone,
Based on some offline discussion, I've split out the "Atomic Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its own
KIP
KIP-1035: StateStore managed changelog offsets
20 matches
Mail list logo