Eduwer,
thanks for your follow up on this work. I wanted to close the loop on it
for visibility.
Eduwer, Bill, and myself did look at the proposal (ie, PR), and had some
discussion in the background, and we had concerns to change the
`StateStore` interface. This interface is very central, and the proposal
seems not to be a step into the right direction, making the interface
and how it interacts with the runtime too complex.
Thus, we decided to not move forward on this, but keep the design of
KIP-1035 as-is.
To fix the underlying issue that the current (partial) implementation of
KIP-1035 introduces, we want to instead drop the optimization of
re-using "startup tasks". Atm, we are creating startup tasks, which we
keep open, and hand them over to StreamsThreads after the first
rebalance as needed. This is an optimization to reduce overhead in
opening/closing tasks (and their state stores). -- We will change the
implementation to not keep this optimization, but we will just close all
startup tasks before the first rebalance, and let StreamsThreads
re-create assigned tasks as always.
The tradeoff is, that startup time of a Kafka Streams application, *if*
there is local state directories, might increase somewhat because
opening/closing startup tasks is not for free. But it's a one-time cost
only, so it seems to be the better tradeoff, compare to "messing" with
the `StateStore` interface and introducing unnecessary complexity into
the code base.
We also don't know for sure, how big the impact on startup times will
actually be. If it's really becoming a problem, we can reevaluate again;
it's still possible to add some optimization into future releases.
-Matthias
On 11/21/25 8:44 AM, Eduwer Camacaro wrote:
Hello everyone, I've been looking into the specifics of this KIP's
implementation. At Littlehorse, we are very interested in this KIP, and we
would like to contribute to its implementation and/or code review. This KIP
is a crucial step toward KIP-892 and should enhance standby performance, as
it won't cause Rocksdb flushes every 10,000 records.
I put some effort into understanding the code around StateStores and offset
computation, and I also looked over the PR that was already merged and the
other one that is still in progress.
Additionally, I was told about this issue (KAFKA-19434) that is related to
this KIP. I believe that this issue is mainly caused by the fact that we
create Tasks in the StateDirectory class (see
StateDirectory#initializeStartupTasks), and as a consequence, these tasks
get assigned to the main thread. That’s why I started to explore an
alternative that opens StateStores instead of Tasks during this
initialization process, but then I realized that it is not an easy change
because the StateStore interface doesn’t support opening stores without
being assigned to a stream thread.
According to my understanding, StateStores' lifecycle is as follows: 1)
During topology build time, state factory classes call store constructors
to create instances. 2) During kafka streams rebalances, the
“StateStore#init” method opens and assigns stores to StreamThreads. Metrics
must be registered at this point. 3) Kafka Streams calls “StateStore#close”
to close the instance.
However, from what I can see, this process is slightly different for
RocksDBStore. RocksDBStore has an extra method, which is
“RocksDBStores#openDB.” This method only opens the rocksdb instance and
makes it writable. Segmented stores use this method to open multiple
segments, which are RocksDBStore instances, and never assign the segment
store to the stream thread (“StateStore#init”) (see
KeyValueSegments#getOrCreateSegment).
With that in mind, I think we can use a similar approach when discovering
stores already present in the state directory and open the store instance
without calling the #init method. But that will require us to modify this
KIP in order to add a new method to the `StateStore` API. I pushed a PR to
demonstrate this alternative in the code
https://github.com/apache/kafka/pull/20749.
Let me know what you think about this alternative.
El dom, 7 abr 2024 a las 10:36, Nick Telford (<[email protected]>)
escribió:
Hi everyone,
Based on some offline discussion, I've split out the "Atomic Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its own
KIP
KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.
All the changes introduced in KIP-1035 have been removed from KIP-892, and
a hard dependency on KIP-1035 has been added to KIP-892 in their place.
I'm hopeful that with some more focus on this set of changes, we can
deliver something that we're all happy with.
Regards,
Nick