Hey,

Thank you for the wealth of great suggestions and questions! I am going to
address the feedback in batches and update the proposal async, as it is
probably going to be easier for everyone. I will also write a separate
message after making updates to the KIP.

@John,

> Did you consider instead just adding the option to the
> RocksDB*StoreSupplier classes and the factories in Stores ?

Thank you for suggesting that. I think that this idea is better than what I
came up with and will update the KIP with configuring transactionality via
the suppliers and Stores.

what is the advantage over just doing the same thing with the RecordCache
> and not introducing the WriteBatch at all?

Can you point me to RecordCache? I can't find it in the project. The
advantage would be that WriteBatch guarantees write atomicity. As far as I
understood the way RecordCache works, it might leave the system in an
inconsistent state during crash failure on write.

You mentioned that a transactional store can help reduce duplication in the
> case of ALOS

I will remove claims about ALOS from the proposal. Thank you for
elaborating!

As a reminder, we have a new IQv2 mechanism now. Should we propose any
> changes to IQv1 to support this transactional mechanism, versus just
> proposing it for IQv2? Certainly, it seems strange only to propose a change
> for IQv1 and not v2.


 I will update the proposal with complementary API changes for IQv2

What should IQ do if I request to readCommitted on a non-transactional
> store?

We can assume that non-transactional stores commit on write, so IQ works in
the same way with non-transactional stores regardless of the value of
readCommitted.


 @Guozhang,

* If we crash between line 3 and 4, then at that time the local persistent
> store image is representing as of offset 200, but upon recovery all
> changelog records from 100 to log-end-offset would be considered as aborted
> and not be replayed and we would restart processing from position 100.
> Restart processing will violate EOS.I'm not sure how e.g. RocksDB's
> WriteBatchWithIndex would make sure that the step 4 and step 5 could be
> done atomically here.


Could you please point me to the place in the codebase where a task flushes
the store before committing the transaction?
Looking at TaskExecutor (
https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167),
StreamTask#prepareCommit (
https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398),
and CachedStateStore (
https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34)
we flush the cache, but not the underlying state store. Explicit
StateStore#flush happens in AbstractTask#maybeWriteCheckpoint (
https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
).
Is there something I am missing here?

Today all cached data that have not been flushed are not committed for
> sure, but even flushed data to the persistent underlying store may also be
> uncommitted since flushing can be triggered asynchronously before the
> commit.

Can you please point me to the place in the codebase where we trigger async
flush before the commit? This would certainly be a reason to introduce a
dedicated StateStore#commit method.

Thanks again for the feedback. I am going to update the KIP and then
respond to the next batch of questions and suggestions.

Best,
Alex

On Mon, May 30, 2022 at 5:13 PM Suhas Satish <ssat...@confluent.io.invalid>
wrote:

> Thanks for the KIP proposal Alex.
> 1. Configuration default
>
> You mention applications using streams DSL with built-in rocksDB state
> store will get transactional state stores by default when EOS is enabled,
> but the default implementation for apps using PAPI will fallback to
> non-transactional behavior.
> Shouldn't we have the same default behavior for both types of apps - DSL
> and PAPI?
>
> On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <cado...@apache.org> wrote:
>
> > Thanks for the PR, Alex!
> >
> > I am also glad to see this coming.
> >
> >
> > 1. Configuration
> >
> > I would also prefer to restrict the configuration of transactional on
> > the state sore. Ideally, calling method transactional() on the state
> > store would be enough. An option on the store builder would make it
> > possible to turn transactionality on and off (as John proposed).
> >
> >
> > 2. Memory usage in RocksDB
> >
> > This seems to be a major issue. We do not have any guarantee that
> > uncommitted writes fit into memory and I guess we will never have. What
> > happens when the uncommitted writes do not fit into memory? Does RocksDB
> > throw an exception? Can we handle such an exception without crashing?
> >
> > Does the RocksDB behavior even need to be included in this KIP? In the
> > end it is an implementation detail.
> >
> > What we should consider - though - is a memory limit in some form. And
> > what we do when the memory limit is exceeded.
> >
> >
> > 3. PoC
> >
> > I agree with Guozhang that a PoC is a good idea to better understand the
> > devils in the details.
> >
> >
> > Best,
> > Bruno
> >
> > On 25.05.22 01:52, Guozhang Wang wrote:
> > > Hello Alex,
> > >
> > > Thanks for writing the proposal! Glad to see it coming. I think this is
> > the
> > > kind of a KIP that since too many devils would be buried in the details
> > and
> > > it's better to start working on a POC, either in parallel, or before we
> > > resume our discussion, rather than blocking any implementation until we
> > are
> > > satisfied with the proposal.
> > >
> > > Just as a concrete example, I personally am still not 100% clear how
> the
> > > proposal would work to achieve EOS with the state stores. For example,
> > the
> > > commit procedure today looks like this:
> > >
> > > 0: there's an existing checkpoint file indicating the changelog offset
> of
> > > the local state store image is 100. Now a commit is triggered:
> > > 1. flush cache (since it contains partially processed records), make
> sure
> > > all records are written to the producer.
> > > 2. flush producer, making sure all changelog records have now acked. //
> > > here we would get the new changelog position, say 200
> > > 3. flush store, make sure all writes are persisted.
> > > 4. producer.sendOffsetsToTransaction(); producer.commitTransaction();
> //
> > we
> > > would make the writes in changelog up to offset 200 committed
> > > 5. Update the checkpoint file to 200.
> > >
> > > The question about atomicity between those lines, for example:
> > >
> > > * If we crash between line 4 and line 5, the local checkpoint file
> would
> > > stay as 100, and upon recovery we would replay the changelog from 100
> to
> > > 200. This is not ideal but does not violate EOS, since the changelogs
> are
> > > all overwrites anyways.
> > > * If we crash between line 3 and 4, then at that time the local
> > persistent
> > > store image is representing as of offset 200, but upon recovery all
> > > changelog records from 100 to log-end-offset would be considered as
> > aborted
> > > and not be replayed and we would restart processing from position 100.
> > > Restart processing will violate EOS.I'm not sure how e.g. RocksDB's
> > > WriteBatchWithIndex would make sure that the step 4 and step 5 could be
> > > done atomically here.
> > >
> > > Originally what I was thinking when creating the JIRA ticket is that we
> > > need to let the state store to provide a transactional API like "token
> > > commit()" used in step 4) above which returns a token, that e.g. in our
> > > example above indicates offset 200, and that token would be written as
> > part
> > > of the records in Kafka transaction in step 5). And upon recovery the
> > state
> > > store would have another API like "rollback(token)" where the token is
> > read
> > > from the latest committed txn, and be used to rollback the store to
> that
> > > committed image. I think your proposal is different, and it seems like
> > > you're proposing we swap step 3) and 4) above, but the atomicity issue
> > > still remains since now you may have the store image at 100 but the
> > > changelog is committed at 200. I'd like to learn more about the details
> > > on how it resolves such issues.
> > >
> > > Anyways, that's just an example to make the point that there are lots
> of
> > > implementational details which would drive the public API design, and
> we
> > > should probably first do a POC, and come back to discuss the KIP. Let
> me
> > > know what you think?
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Tue, May 24, 2022 at 10:35 AM Sagar <sagarmeansoc...@gmail.com>
> > wrote:
> > >
> > >> Hi Alexander,
> > >>
> > >> Thanks for the KIP! This seems like a great proposal. I have the same
> > >> opinion as John on the Configuration part though. I think the 2 level
> > >> config and its behaviour based on the setting/unsetting of the flag
> > seems
> > >> confusing to me as well. Since the KIP seems specifically centred
> around
> > >> RocksDB it might be better to add it at the Supplier level as John
> > >> suggested.
> > >>
> > >> On similar lines, this config name =>
> > *statestore.transactional.mechanism
> > >> *may
> > >> also need rethinking as the value assigned to it(rocksdb_indexbatch)
> > >> implicitly seems to assume that rocksdb is the only statestore that
> > Kafka
> > >> Stream supports while that's not the case.
> > >>
> > >> Also, regarding the potential memory pressure that can be introduced
> by
> > >> WriteBatchIndex, do you think it might make more sense to include some
> > >> numbers/benchmarks on how much the memory consumption might increase?
> > >>
> > >> Lastly, the read_uncommitted flag's behaviour on IQ may need more
> > >> elaboration.
> > >>
> > >> These points aside, as I said, this is a great proposal!
> > >>
> > >> Thanks!
> > >> Sagar.
> > >>
> > >> On Tue, May 24, 2022 at 10:35 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > >>
> > >>> Thanks for the KIP, Alex!
> > >>>
> > >>> I'm really happy to see your proposal. This improvement fills a
> > >>> long-standing gap.
> > >>>
> > >>> I have a few questions:
> > >>>
> > >>> 1. Configuration
> > >>> The KIP only mentions RocksDB, but of course, Streams also ships with
> > an
> > >>> InMemory store, and users also plug in their own custom state stores.
> > It
> > >> is
> > >>> also common to use multiple types of state stores in the same
> > application
> > >>> for different purposes.
> > >>>
> > >>> Against this backdrop, the choice to configure transactionality as a
> > >>> top-level config, as well as to configure the store transaction
> > mechanism
> > >>> as a top-level config, seems a bit off.
> > >>>
> > >>> Did you consider instead just adding the option to the
> > >>> RocksDB*StoreSupplier classes and the factories in Stores ? It seems
> > like
> > >>> the desire to enable the feature by default, but with a feature-flag
> to
> > >>> disable it was a factor here. However, as you pointed out, there are
> > some
> > >>> major considerations that users should be aware of, so opt-in doesn't
> > >> seem
> > >>> like a bad choice, either. You could add an Enum argument to those
> > >>> factories like `RocksDBTransactionalMechanism.{NONE,
> > >>>
> > >>> Some points in favor of this approach:
> > >>> * Avoid "stores that don't support transactions ignore the config"
> > >>> complexity
> > >>> * Users can choose how to spend their memory budget, making some
> stores
> > >>> transactional and others not
> > >>> * When we add transactional support to in-memory stores, we don't
> have
> > to
> > >>> figure out what to do with the mechanism config (i.e., what do you
> set
> > >> the
> > >>> mechanism to when there are multiple kinds of transactional stores in
> > the
> > >>> topology?)
> > >>>
> > >>> 2. caching/flushing/transactions
> > >>> The coupling between memory usage and flushing that you mentioned is
> a
> > >> bit
> > >>> troubling. It also occurs to me that there seems to be some
> > relationship
> > >>> with the existing record cache, which is also an in-memory holding
> area
> > >> for
> > >>> records that are not yet written to the cache and/or store (albeit
> with
> > >> no
> > >>> particular semantics). Have you considered how all these components
> > >> should
> > >>> relate? For example, should a "full" WriteBatch actually trigger a
> > flush
> > >> so
> > >>> that we don't get OOMEs? If the proposed transactional mechanism
> forces
> > >> all
> > >>> uncommitted writes to be buffered in memory, until a commit, then
> what
> > is
> > >>> the advantage over just doing the same thing with the RecordCache and
> > not
> > >>> introducing the WriteBatch at all?
> > >>>
> > >>> 3. ALOS
> > >>> You mentioned that a transactional store can help reduce duplication
> in
> > >>> the case of ALOS. We might want to be careful about claims like that.
> > >>> Duplication isn't the way that repeated processing manifests in state
> > >>> stores. Rather, it is in the form of dirty reads during reprocessing.
> > >> This
> > >>> feature may reduce the incidence of dirty reads during reprocessing,
> > but
> > >>> not in a predictable way. During regular processing today, we will
> send
> > >>> some records through to the changelog in between commit intervals.
> > Under
> > >>> ALOS, if any of those dirty writes gets committed to the changelog
> > topic,
> > >>> then upon failure, we have to roll the store forward to them anyway,
> > >>> regardless of this new transactional mechanism. That's a fixable
> > problem,
> > >>> by the way, but this KIP doesn't seem to fix it. I wonder if we
> should
> > >> make
> > >>> any claims about the relationship of this feature to ALOS if the
> > >> real-world
> > >>> behavior is so complex.
> > >>>
> > >>> 4. IQ
> > >>> As a reminder, we have a new IQv2 mechanism now. Should we propose
> any
> > >>> changes to IQv1 to support this transactional mechanism, versus just
> > >>> proposing it for IQv2? Certainly, it seems strange only to propose a
> > >> change
> > >>> for IQv1 and not v2.
> > >>>
> > >>> Regarding your proposal for IQv1, I'm unsure what the behavior should
> > be
> > >>> for readCommitted, since the current behavior also reads out of the
> > >>> RecordCache. I guess if readCommitted==false, then we will continue
> to
> > >> read
> > >>> from the cache first, then the Batch, then the store; and if
> > >>> readCommitted==true, we would skip the cache and the Batch and only
> > read
> > >>> from the persistent RocksDB store?
> > >>>
> > >>> What should IQ do if I request to readCommitted on a
> non-transactional
> > >>> store?
> > >>>
> > >>> Thanks again for proposing the KIP, and my apologies for the long
> > reply;
> > >>> I'm hoping to air all my concerns in one "batch" to save time for
> you.
> > >>>
> > >>> Thanks,
> > >>> -John
> > >>>
> > >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote:
> > >>>> Hi all,
> > >>>>
> > >>>> I've written a KIP for making Kafka Streams state stores
> transactional
> > >>> and
> > >>>> would like to start a discussion:
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> > >>>>
> > >>>> Best,
> > >>>> Alex
> > >>>
> > >>
> > >
> > >
> >
>
>
> --
>
> [image: Confluent] <https://www.confluent.io>
> Suhas Satish
> Engineering Manager
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> <https://www.linkedin.com/company/confluent/>
>
> [image: Try Confluent Cloud for Free]
> <
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic
> >
>

Reply via email to