To close the loop on this thread. KIP-892 was accepted and is currently
implemented. Thus I'll go a head and mark this KIP a discarded.
Thanks a lot Alex for spending so much time on this very important
feature! Without your ground work, we would not have KIP-892 and your
contributions are
Hi Alex,
Thanks for getting back to me. I actually have most of a working
implementation already. I'm going to write it up as a new KIP, so that it
can be reviewed independently of KIP-844.
Hopefully, working together we can have it ready sooner.
I'll keep you posted on my progress.
Regards,
Hey Nick,
Thank you for the prototype testing and benchmarking, and sorry for the
late reply!
I agree that it is worth revisiting the WriteBatchWithIndex approach. I
will implement a fork of the current prototype that uses that mechanism to
ensure transactionality and let you know when it is
Hi everyone,
Sorry to dredge this up again. I've had a chance to start doing some
testing with the WIP Pull Request, and it appears as though the secondary
store solution performs rather poorly.
In our testing, we had a non-transactional state store that would restore
(from scratch), at a rate
Hey Nick,
It is going to be option c. Existing state is considered to be committed
and there will be an additional RocksDB for uncommitted writes.
I am out of office until October 24. I will update KIP and make sure that
we have an upgrade test for that after coming back from vacation.
Best,
Hi everyone,
I realise this has already been voted on and accepted, but it occurred to
me today that the KIP doesn't define the migration/upgrade path for
existing non-transactional StateStores that *become* transactional, i.e. by
adding the transactional boolean to the StateStore constructor.
Hey Guozhang,
Sounds good. I annotated all added StateStore methods (commit, recover,
transactional) with @Evolving.
Best,
Alex
On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang wrote:
> Hello Alex,
>
> Thanks for the detailed replies, I think that makes sense, and in the long
> run we would
Hello Alex,
Thanks for the detailed replies, I think that makes sense, and in the long
run we would need some public indicators from StateStore to determine if
checkpoints can really be used to indicate clean snapshots.
As for the @Evolving label, I think we can still keep it but for a
different
Hey Guozhang,
I think that we will have to keep StateStore#transactional() because
post-commit checkpointing of non-txn state stores will break the guarantees
we want in ProcessorStateManager#initializeStoreOffsetsFromCheckpoint for
correct recovery. Let's consider checkpoint-recovery behavior
Hello Alex,
Thanks for the replies!
> As long as we allow custom user implementations of that interface, we
should
probably either keep that flag to distinguish between transactional and
non-transactional implementations or change the contract behind the
interface. What do you think?
Regarding
Hi Alex,
Thanks for your response. Yeah I kind of mixed the EOS behaviour with state
stores. Thanks for clarifying.
I think what you are suggesting wrt querying secondary stores makes sense.
What I had imagined was trying to leverage the fact that the keys are
sorted in a known order. So, before
Hey Sagar,
I'll start from the end.
if EOS is enabled, it would return only committed data.
I think you might refer to Kafka's consumer isolation levels. To my
knowledge, they only work for consuming data from a topic. For example,
READ_COMMITTED prevents reading data from an ongoing Kafka
Hi Alex,
I went through the KIP again and it looks good to me. I just had a question
on the secondary state stores:
*All writes and deletes go to the temporary store. Reads query the
temporary store; if the data is missing, query the regular store. Range
reads query both stores and return a
Hey Guozhang,
Thank you for elaborating! I like your idea to introduce a StreamsConfig
specifically for the default store APIs. You mentioned Materialized, but I
think changes in StreamJoined follow the same logic.
I updated the KIP and the prototype according to your suggestions:
* Add a new
Hello Alex,
Thanks for the replies. Regarding the global config v.s. per-store spec, I
agree with John's early comments to some degrees, but I think we may well
distinguish a couple scenarios here. In sum we are discussing about the
following levels of per-store spec:
*
Hey Guozhang, Bruno,
Thank you for your feedback. I am going to respond to both of you in a
single email. I hope it is okay.
@Guozhang,
We could, instead, have a global
> config to specify if the built-in stores should be transactional or not.
This was the original approach I took in this
Hi Alex,
Thank a lot for explaining!
Now some aspects are clearer to me.
While I understand now, how the state store can roll forward, I have the
feeling that rolling forward is specific to the 2-state-store
implementation with RocksDB of your PoC. Other state store
implementations might
Hello Alex,
I made a pass on your WIP PR (thanks for putting it up! It helps for me to
understand many details). And here are some more thoughts:
1. I looked over all the places where `.transactional()` is called, and I
still think that they are not needed, and hence we could consider removing
Hey Bruno,
Thank you for the suggestions and the clarifying questions. I believe that
they cover the core of this proposal, so it is crucial for us to be on the
same page.
1. Don't you want to deprecate StateStore#flush().
Good call! I updated both the proposal and the prototype.
2. I would
Hi Alex,
Thanks for the updates!
1. Don't you want to deprecate StateStore#flush(). As far as I
understand, commit() is the new flush(), right? If you do not deprecate
it, you don't get rid of the error room you describe in your KIP by
having a flush() and a commit().
2. I would shorten
Hey Nick,
Thank you for the kind words and the feedback! I'll definitely add an
option to configure the transactional mechanism in Stores factory method
via an argument as John previously suggested and might add the in-memory
option via RocksDB Indexed Batches if I figure why their creation via
Hey Guozhang,
1) About the param passed into the `recover()` function: it seems to me
> that the semantics of "recover(offset)" is: recover this state to a
> transaction boundary which is at least the passed-in offset. And the only
> possibility that the returned offset is different than the
Hi Alex,
Excellent proposal, I'm very keen to see this land!
Would it be useful to permit configuring the type of store used for
uncommitted offsets on a store-by-store basis? This way, users could choose
whether to use, e.g. an in-memory store or RocksDB, potentially reducing
the overheads
Hello Alex,
Thanks for the updated KIP, I looked over it and browsed the WIP and just
have a couple meta thoughts:
1) About the param passed into the `recover()` function: it seems to me
that the semantics of "recover(offset)" is: recover this state to a
transaction boundary which is at least
Hi,
I updated the KIP with the following changes:
* Replaced in-memory batches with the secondary-store approach as the
default implementation to address the feedback about memory pressure as
suggested by Sagar and Bruno.
* Introduced StateStore#commit and StateStore#recover methods as an
Hi,
As a status update, I did the following changes to the KIP:
* replaced configuration via the top-level config with configuration via
Stores factory and StoreSuppliers,
* added IQv2 and elaborated how readCommitted will work when the store is
not transactional,
* removed claims about ALOS.
I
Alex,
Thanks for your replies! That is very helpful.
Just to broaden our discussions a bit here, I think there are some other
approaches in parallel to the idea of "enforce to only persist upon
explicit flush" and I'd like to throw one here -- not really advocating it,
but just for us to compare
Hi Guozhang,
But I'm still trying to clarify how it guarantees EOS, and it seems that we
> would achieve it by enforcing to not persist any data written within this
> transaction until step 4. Is that correct?
This is correct. Both alternatives - in-memory WriteBatchWithIndex and
Hello Alex,
> we flush the cache, but not the underlying state store.
You're right. The ordering I mentioned above is actually:
...
3. producer.sendOffsetsToTransaction(); producer.commitTransaction();
4. flush store, make sure all writes are persisted.
5. Update the checkpoint file to 200.
Hey,
Thank you for the wealth of great suggestions and questions! I am going to
address the feedback in batches and update the proposal async, as it is
probably going to be easier for everyone. I will also write a separate
message after making updates to the KIP.
@John,
> Did you consider
Thanks for the KIP proposal Alex.
1. Configuration default
You mention applications using streams DSL with built-in rocksDB state
store will get transactional state stores by default when EOS is enabled,
but the default implementation for apps using PAPI will fallback to
non-transactional
Thanks for the PR, Alex!
I am also glad to see this coming.
1. Configuration
I would also prefer to restrict the configuration of transactional on
the state sore. Ideally, calling method transactional() on the state
store would be enough. An option on the store builder would make it
Also to dump my thoughts on the topic of "caching/flushing/transactions"
that John raised, especially about their relations to range queries (I
think some of them could be excluded from the scope of KIP-844, but I'd
like to see if we have some consensus on the long-term vision):
0) Today all
Hello Alex,
Thanks for writing the proposal! Glad to see it coming. I think this is the
kind of a KIP that since too many devils would be buried in the details and
it's better to start working on a POC, either in parallel, or before we
resume our discussion, rather than blocking any
Hi Alexander,
Thanks for the KIP! This seems like a great proposal. I have the same
opinion as John on the Configuration part though. I think the 2 level
config and its behaviour based on the setting/unsetting of the flag seems
confusing to me as well. Since the KIP seems specifically centred
Thanks for the KIP, Alex!
I'm really happy to see your proposal. This improvement fills a long-standing
gap.
I have a few questions:
1. Configuration
The KIP only mentions RocksDB, but of course, Streams also ships with an
InMemory store, and users also plug in their own custom state stores.
Hi all,
I've written a KIP for making Kafka Streams state stores transactional and
would like to start a discussion:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
Best,
Alex
37 matches
Mail list logo