Re: [DISCUSS] KIP-844: Transactional State Stores

2024-02-22 Thread Matthias J. Sax
To close the loop on this thread. KIP-892 was accepted and is currently implemented. Thus I'll go a head and mark this KIP a discarded. Thanks a lot Alex for spending so much time on this very important feature! Without your ground work, we would not have KIP-892 and your contributions are

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-11-21 Thread Nick Telford
Hi Alex, Thanks for getting back to me. I actually have most of a working implementation already. I'm going to write it up as a new KIP, so that it can be reviewed independently of KIP-844. Hopefully, working together we can have it ready sooner. I'll keep you posted on my progress. Regards,

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-11-21 Thread Alexander Sorokoumov
Hey Nick, Thank you for the prototype testing and benchmarking, and sorry for the late reply! I agree that it is worth revisiting the WriteBatchWithIndex approach. I will implement a fork of the current prototype that uses that mechanism to ensure transactionality and let you know when it is

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-11-11 Thread Nick Telford
Hi everyone, Sorry to dredge this up again. I've had a chance to start doing some testing with the WIP Pull Request, and it appears as though the secondary store solution performs rather poorly. In our testing, we had a non-transactional state store that would restore (from scratch), at a rate

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-10-06 Thread Alexander Sorokoumov
Hey Nick, It is going to be option c. Existing state is considered to be committed and there will be an additional RocksDB for uncommitted writes. I am out of office until October 24. I will update KIP and make sure that we have an upgrade test for that after coming back from vacation. Best,

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-10-06 Thread Nick Telford
Hi everyone, I realise this has already been voted on and accepted, but it occurred to me today that the KIP doesn't define the migration/upgrade path for existing non-transactional StateStores that *become* transactional, i.e. by adding the transactional boolean to the StateStore constructor.

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-09-01 Thread Alexander Sorokoumov
Hey Guozhang, Sounds good. I annotated all added StateStore methods (commit, recover, transactional) with @Evolving. Best, Alex On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang wrote: > Hello Alex, > > Thanks for the detailed replies, I think that makes sense, and in the long > run we would

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-31 Thread Guozhang Wang
Hello Alex, Thanks for the detailed replies, I think that makes sense, and in the long run we would need some public indicators from StateStore to determine if checkpoints can really be used to indicate clean snapshots. As for the @Evolving label, I think we can still keep it but for a different

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-26 Thread Alexander Sorokoumov
Hey Guozhang, I think that we will have to keep StateStore#transactional() because post-commit checkpointing of non-txn state stores will break the guarantees we want in ProcessorStateManager#initializeStoreOffsetsFromCheckpoint for correct recovery. Let's consider checkpoint-recovery behavior

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-18 Thread Guozhang Wang
Hello Alex, Thanks for the replies! > As long as we allow custom user implementations of that interface, we should probably either keep that flag to distinguish between transactional and non-transactional implementations or change the contract behind the interface. What do you think? Regarding

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-17 Thread Sagar
Hi Alex, Thanks for your response. Yeah I kind of mixed the EOS behaviour with state stores. Thanks for clarifying. I think what you are suggesting wrt querying secondary stores makes sense. What I had imagined was trying to leverage the fact that the keys are sorted in a known order. So, before

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-17 Thread Alexander Sorokoumov
Hey Sagar, I'll start from the end. if EOS is enabled, it would return only committed data. I think you might refer to Kafka's consumer isolation levels. To my knowledge, they only work for consuming data from a topic. For example, READ_COMMITTED prevents reading data from an ongoing Kafka

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-17 Thread Sagar
Hi Alex, I went through the KIP again and it looks good to me. I just had a question on the secondary state stores: *All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query both stores and return a

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-15 Thread Alexander Sorokoumov
Hey Guozhang, Thank you for elaborating! I like your idea to introduce a StreamsConfig specifically for the default store APIs. You mentioned Materialized, but I think changes in StreamJoined follow the same logic. I updated the KIP and the prototype according to your suggestions: * Add a new

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-10 Thread Guozhang Wang
Hello Alex, Thanks for the replies. Regarding the global config v.s. per-store spec, I agree with John's early comments to some degrees, but I think we may well distinguish a couple scenarios here. In sum we are discussing about the following levels of per-store spec: *

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-10 Thread Alexander Sorokoumov
Hey Guozhang, Bruno, Thank you for your feedback. I am going to respond to both of you in a single email. I hope it is okay. @Guozhang, We could, instead, have a global > config to specify if the built-in stores should be transactional or not. This was the original approach I took in this

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-10 Thread Bruno Cadonna
Hi Alex, Thank a lot for explaining! Now some aspects are clearer to me. While I understand now, how the state store can roll forward, I have the feeling that rolling forward is specific to the 2-state-store implementation with RocksDB of your PoC. Other state store implementations might

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-07 Thread Guozhang Wang
Hello Alex, I made a pass on your WIP PR (thanks for putting it up! It helps for me to understand many details). And here are some more thoughts: 1. I looked over all the places where `.transactional()` is called, and I still think that they are not needed, and hence we could consider removing

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-04 Thread Alexander Sorokoumov
Hey Bruno, Thank you for the suggestions and the clarifying questions. I believe that they cover the core of this proposal, so it is crucial for us to be on the same page. 1. Don't you want to deprecate StateStore#flush(). Good call! I updated both the proposal and the prototype. 2. I would

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-07-29 Thread Bruno Cadonna
Hi Alex, Thanks for the updates! 1. Don't you want to deprecate StateStore#flush(). As far as I understand, commit() is the new flush(), right? If you do not deprecate it, you don't get rid of the error room you describe in your KIP by having a flush() and a commit(). 2. I would shorten

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-07-27 Thread Alexander Sorokoumov
Hey Nick, Thank you for the kind words and the feedback! I'll definitely add an option to configure the transactional mechanism in Stores factory method via an argument as John previously suggested and might add the in-memory option via RocksDB Indexed Batches if I figure why their creation via

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-07-27 Thread Alexander Sorokoumov
Hey Guozhang, 1) About the param passed into the `recover()` function: it seems to me > that the semantics of "recover(offset)" is: recover this state to a > transaction boundary which is at least the passed-in offset. And the only > possibility that the returned offset is different than the

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-07-26 Thread Nick Telford
Hi Alex, Excellent proposal, I'm very keen to see this land! Would it be useful to permit configuring the type of store used for uncommitted offsets on a store-by-store basis? This way, users could choose whether to use, e.g. an in-memory store or RocksDB, potentially reducing the overheads

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-07-25 Thread Guozhang Wang
Hello Alex, Thanks for the updated KIP, I looked over it and browsed the WIP and just have a couple meta thoughts: 1) About the param passed into the `recover()` function: it seems to me that the semantics of "recover(offset)" is: recover this state to a transaction boundary which is at least

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-07-14 Thread Alexander Sorokoumov
Hi, I updated the KIP with the following changes: * Replaced in-memory batches with the secondary-store approach as the default implementation to address the feedback about memory pressure as suggested by Sagar and Bruno. * Introduced StateStore#commit and StateStore#recover methods as an

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-06-07 Thread Alexander Sorokoumov
Hi, As a status update, I did the following changes to the KIP: * replaced configuration via the top-level config with configuration via Stores factory and StoreSuppliers, * added IQv2 and elaborated how readCommitted will work when the store is not transactional, * removed claims about ALOS. I

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-06-01 Thread Guozhang Wang
Alex, Thanks for your replies! That is very helpful. Just to broaden our discussions a bit here, I think there are some other approaches in parallel to the idea of "enforce to only persist upon explicit flush" and I'd like to throw one here -- not really advocating it, but just for us to compare

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-06-01 Thread Alexander Sorokoumov
Hi Guozhang, But I'm still trying to clarify how it guarantees EOS, and it seems that we > would achieve it by enforcing to not persist any data written within this > transaction until step 4. Is that correct? This is correct. Both alternatives - in-memory WriteBatchWithIndex and

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-06-01 Thread Guozhang Wang
Hello Alex, > we flush the cache, but not the underlying state store. You're right. The ordering I mentioned above is actually: ... 3. producer.sendOffsetsToTransaction(); producer.commitTransaction(); 4. flush store, make sure all writes are persisted. 5. Update the checkpoint file to 200.

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-05-31 Thread Alexander Sorokoumov
Hey, Thank you for the wealth of great suggestions and questions! I am going to address the feedback in batches and update the proposal async, as it is probably going to be easier for everyone. I will also write a separate message after making updates to the KIP. @John, > Did you consider

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-05-30 Thread Suhas Satish
Thanks for the KIP proposal Alex. 1. Configuration default You mention applications using streams DSL with built-in rocksDB state store will get transactional state stores by default when EOS is enabled, but the default implementation for apps using PAPI will fallback to non-transactional

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-05-30 Thread Bruno Cadonna
Thanks for the PR, Alex! I am also glad to see this coming. 1. Configuration I would also prefer to restrict the configuration of transactional on the state sore. Ideally, calling method transactional() on the state store would be enough. An option on the store builder would make it

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-05-25 Thread Guozhang Wang
Also to dump my thoughts on the topic of "caching/flushing/transactions" that John raised, especially about their relations to range queries (I think some of them could be excluded from the scope of KIP-844, but I'd like to see if we have some consensus on the long-term vision): 0) Today all

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-05-24 Thread Guozhang Wang
Hello Alex, Thanks for writing the proposal! Glad to see it coming. I think this is the kind of a KIP that since too many devils would be buried in the details and it's better to start working on a POC, either in parallel, or before we resume our discussion, rather than blocking any

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-05-24 Thread Sagar
Hi Alexander, Thanks for the KIP! This seems like a great proposal. I have the same opinion as John on the Configuration part though. I think the 2 level config and its behaviour based on the setting/unsetting of the flag seems confusing to me as well. Since the KIP seems specifically centred

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-05-24 Thread John Roesler
Thanks for the KIP, Alex! I'm really happy to see your proposal. This improvement fills a long-standing gap. I have a few questions: 1. Configuration The KIP only mentions RocksDB, but of course, Streams also ships with an InMemory store, and users also plug in their own custom state stores.

[DISCUSS] KIP-844: Transactional State Stores

2022-05-24 Thread Alexander Sorokoumov
Hi all, I've written a KIP for making Kafka Streams state stores transactional and would like to start a discussion: https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores Best, Alex