Thanks for the PR, Alex!

I am also glad to see this coming.

1. Configuration

I would also prefer to restrict the configuration of transactional on the state sore. Ideally, calling method transactional() on the state store would be enough. An option on the store builder would make it possible to turn transactionality on and off (as John proposed).

2. Memory usage in RocksDB

This seems to be a major issue. We do not have any guarantee that uncommitted writes fit into memory and I guess we will never have. What happens when the uncommitted writes do not fit into memory? Does RocksDB throw an exception? Can we handle such an exception without crashing?

Does the RocksDB behavior even need to be included in this KIP? In the end it is an implementation detail.

What we should consider - though - is a memory limit in some form. And what we do when the memory limit is exceeded.

3. PoC

I agree with Guozhang that a PoC is a good idea to better understand the devils in the details.


On 25.05.22 01:52, Guozhang Wang wrote:
Hello Alex,

Thanks for writing the proposal! Glad to see it coming. I think this is the
kind of a KIP that since too many devils would be buried in the details and
it's better to start working on a POC, either in parallel, or before we
resume our discussion, rather than blocking any implementation until we are
satisfied with the proposal.

Just as a concrete example, I personally am still not 100% clear how the
proposal would work to achieve EOS with the state stores. For example, the
commit procedure today looks like this:

0: there's an existing checkpoint file indicating the changelog offset of
the local state store image is 100. Now a commit is triggered:
1. flush cache (since it contains partially processed records), make sure
all records are written to the producer.
2. flush producer, making sure all changelog records have now acked. //
here we would get the new changelog position, say 200
3. flush store, make sure all writes are persisted.
4. producer.sendOffsetsToTransaction(); producer.commitTransaction(); // we
would make the writes in changelog up to offset 200 committed
5. Update the checkpoint file to 200.

The question about atomicity between those lines, for example:

* If we crash between line 4 and line 5, the local checkpoint file would
stay as 100, and upon recovery we would replay the changelog from 100 to
200. This is not ideal but does not violate EOS, since the changelogs are
all overwrites anyways.
* If we crash between line 3 and 4, then at that time the local persistent
store image is representing as of offset 200, but upon recovery all
changelog records from 100 to log-end-offset would be considered as aborted
and not be replayed and we would restart processing from position 100.
Restart processing will violate EOS.I'm not sure how e.g. RocksDB's
WriteBatchWithIndex would make sure that the step 4 and step 5 could be
done atomically here.

Originally what I was thinking when creating the JIRA ticket is that we
need to let the state store to provide a transactional API like "token
commit()" used in step 4) above which returns a token, that e.g. in our
example above indicates offset 200, and that token would be written as part
of the records in Kafka transaction in step 5). And upon recovery the state
store would have another API like "rollback(token)" where the token is read
from the latest committed txn, and be used to rollback the store to that
committed image. I think your proposal is different, and it seems like
you're proposing we swap step 3) and 4) above, but the atomicity issue
still remains since now you may have the store image at 100 but the
changelog is committed at 200. I'd like to learn more about the details
on how it resolves such issues.

Anyways, that's just an example to make the point that there are lots of
implementational details which would drive the public API design, and we
should probably first do a POC, and come back to discuss the KIP. Let me
know what you think?


On Tue, May 24, 2022 at 10:35 AM Sagar <> wrote:

Hi Alexander,

Thanks for the KIP! This seems like a great proposal. I have the same
opinion as John on the Configuration part though. I think the 2 level
config and its behaviour based on the setting/unsetting of the flag seems
confusing to me as well. Since the KIP seems specifically centred around
RocksDB it might be better to add it at the Supplier level as John

On similar lines, this config name => *statestore.transactional.mechanism
also need rethinking as the value assigned to it(rocksdb_indexbatch)
implicitly seems to assume that rocksdb is the only statestore that Kafka
Stream supports while that's not the case.

Also, regarding the potential memory pressure that can be introduced by
WriteBatchIndex, do you think it might make more sense to include some
numbers/benchmarks on how much the memory consumption might increase?

Lastly, the read_uncommitted flag's behaviour on IQ may need more

These points aside, as I said, this is a great proposal!


On Tue, May 24, 2022 at 10:35 PM John Roesler <> wrote:

Thanks for the KIP, Alex!

I'm really happy to see your proposal. This improvement fills a
long-standing gap.

I have a few questions:

1. Configuration
The KIP only mentions RocksDB, but of course, Streams also ships with an
InMemory store, and users also plug in their own custom state stores. It
also common to use multiple types of state stores in the same application
for different purposes.

Against this backdrop, the choice to configure transactionality as a
top-level config, as well as to configure the store transaction mechanism
as a top-level config, seems a bit off.

Did you consider instead just adding the option to the
RocksDB*StoreSupplier classes and the factories in Stores ? It seems like
the desire to enable the feature by default, but with a feature-flag to
disable it was a factor here. However, as you pointed out, there are some
major considerations that users should be aware of, so opt-in doesn't
like a bad choice, either. You could add an Enum argument to those
factories like `RocksDBTransactionalMechanism.{NONE,

Some points in favor of this approach:
* Avoid "stores that don't support transactions ignore the config"
* Users can choose how to spend their memory budget, making some stores
transactional and others not
* When we add transactional support to in-memory stores, we don't have to
figure out what to do with the mechanism config (i.e., what do you set
mechanism to when there are multiple kinds of transactional stores in the

2. caching/flushing/transactions
The coupling between memory usage and flushing that you mentioned is a
troubling. It also occurs to me that there seems to be some relationship
with the existing record cache, which is also an in-memory holding area
records that are not yet written to the cache and/or store (albeit with
particular semantics). Have you considered how all these components
relate? For example, should a "full" WriteBatch actually trigger a flush
that we don't get OOMEs? If the proposed transactional mechanism forces
uncommitted writes to be buffered in memory, until a commit, then what is
the advantage over just doing the same thing with the RecordCache and not
introducing the WriteBatch at all?

You mentioned that a transactional store can help reduce duplication in
the case of ALOS. We might want to be careful about claims like that.
Duplication isn't the way that repeated processing manifests in state
stores. Rather, it is in the form of dirty reads during reprocessing.
feature may reduce the incidence of dirty reads during reprocessing, but
not in a predictable way. During regular processing today, we will send
some records through to the changelog in between commit intervals. Under
ALOS, if any of those dirty writes gets committed to the changelog topic,
then upon failure, we have to roll the store forward to them anyway,
regardless of this new transactional mechanism. That's a fixable problem,
by the way, but this KIP doesn't seem to fix it. I wonder if we should
any claims about the relationship of this feature to ALOS if the
behavior is so complex.

4. IQ
As a reminder, we have a new IQv2 mechanism now. Should we propose any
changes to IQv1 to support this transactional mechanism, versus just
proposing it for IQv2? Certainly, it seems strange only to propose a
for IQv1 and not v2.

Regarding your proposal for IQv1, I'm unsure what the behavior should be
for readCommitted, since the current behavior also reads out of the
RecordCache. I guess if readCommitted==false, then we will continue to
from the cache first, then the Batch, then the store; and if
readCommitted==true, we would skip the cache and the Batch and only read
from the persistent RocksDB store?

What should IQ do if I request to readCommitted on a non-transactional

Thanks again for proposing the KIP, and my apologies for the long reply;
I'm hoping to air all my concerns in one "batch" to save time for you.


On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote:
Hi all,

I've written a KIP for making Kafka Streams state stores transactional
would like to start a discussion:


