Thanks for the KIP, Alex!

I'm really happy to see your proposal. This improvement fills a long-standing 
gap.

I have a few questions:

1. Configuration
The KIP only mentions RocksDB, but of course, Streams also ships with an 
InMemory store, and users also plug in their own custom state stores. It is 
also common to use multiple types of state stores in the same application for 
different purposes.

Against this backdrop, the choice to configure transactionality as a top-level 
config, as well as to configure the store transaction mechanism as a top-level 
config, seems a bit off.

Did you consider instead just adding the option to the RocksDB*StoreSupplier 
classes and the factories in Stores ? It seems like the desire to enable the 
feature by default, but with a feature-flag to disable it was a factor here. 
However, as you pointed out, there are some major considerations that users 
should be aware of, so opt-in doesn't seem like a bad choice, either. You could 
add an Enum argument to those factories like 
`RocksDBTransactionalMechanism.{NONE,

Some points in favor of this approach:
* Avoid "stores that don't support transactions ignore the config" complexity
* Users can choose how to spend their memory budget, making some stores 
transactional and others not
* When we add transactional support to in-memory stores, we don't have to 
figure out what to do with the mechanism config (i.e., what do you set the 
mechanism to when there are multiple kinds of transactional stores in the 
topology?)

2. caching/flushing/transactions
The coupling between memory usage and flushing that you mentioned is a bit 
troubling. It also occurs to me that there seems to be some relationship with 
the existing record cache, which is also an in-memory holding area for records 
that are not yet written to the cache and/or store (albeit with no particular 
semantics). Have you considered how all these components should relate? For 
example, should a "full" WriteBatch actually trigger a flush so that we don't 
get OOMEs? If the proposed transactional mechanism forces all uncommitted 
writes to be buffered in memory, until a commit, then what is the advantage 
over just doing the same thing with the RecordCache and not introducing the 
WriteBatch at all?

3. ALOS
You mentioned that a transactional store can help reduce duplication in the 
case of ALOS. We might want to be careful about claims like that. Duplication 
isn't the way that repeated processing manifests in state stores. Rather, it is 
in the form of dirty reads during reprocessing. This feature may reduce the 
incidence of dirty reads during reprocessing, but not in a predictable way. 
During regular processing today, we will send some records through to the 
changelog in between commit intervals. Under ALOS, if any of those dirty writes 
gets committed to the changelog topic, then upon failure, we have to roll the 
store forward to them anyway, regardless of this new transactional mechanism. 
That's a fixable problem, by the way, but this KIP doesn't seem to fix it. I 
wonder if we should make any claims about the relationship of this feature to 
ALOS if the real-world behavior is so complex.

4. IQ
As a reminder, we have a new IQv2 mechanism now. Should we propose any changes 
to IQv1 to support this transactional mechanism, versus just proposing it for 
IQv2? Certainly, it seems strange only to propose a change for IQv1 and not v2.

Regarding your proposal for IQv1, I'm unsure what the behavior should be for 
readCommitted, since the current behavior also reads out of the RecordCache. I 
guess if readCommitted==false, then we will continue to read from the cache 
first, then the Batch, then the store; and if readCommitted==true, we would 
skip the cache and the Batch and only read from the persistent RocksDB store?

What should IQ do if I request to readCommitted on a non-transactional store?

Thanks again for proposing the KIP, and my apologies for the long reply; I'm 
hoping to air all my concerns in one "batch" to save time for you.

Thanks,
-John

On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote:
> Hi all,
>
> I've written a KIP for making Kafka Streams state stores transactional and
> would like to start a discussion:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
>
> Best,
> Alex

Reply via email to