Hi Guozhang,

But I'm still trying to clarify how it guarantees EOS, and it seems that we
> would achieve it by enforcing to not persist any data written within this
> transaction until step 4. Is that correct?


This is correct. Both alternatives - in-memory WriteBatchWithIndex and
transactionality via the secondary store guarantee EOS by not persisting
data in the "main" state store until it is committed in the changelog topic.

Oh what I meant is not what KStream code does, but that StateStore impl
> classes themselves could potentially flush data to become persisted
> asynchronously


Thank you for elaborating! You are correct, the underlying state store
should not persist data until the streams app calls StateStore#flush. There
are 2 options how a State Store implementation can guarantee that - either
keep uncommitted writes in memory or be able to roll back the changes that
were not committed during recovery. RocksDB's WriteBatchWithIndex is an
implementation of the first option. A considered alternative, Transactions
via Secondary State Store for Uncommitted Changes, is the way to implement
the second option.

As everyone correctly pointed out, keeping uncommitted data in memory
introduces a very real risk of OOM that we will need to handle. The more I
think about it, the more I lean towards going with the Transactions via
Secondary Store as the way to implement transactionality as it does not
have that issue.

Best,
Alex


On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Alex,
>
> > we flush the cache, but not the underlying state store.
>
> You're right. The ordering I mentioned above is actually:
>
> ...
> 3. producer.sendOffsetsToTransaction(); producer.commitTransaction();
> 4. flush store, make sure all writes are persisted.
> 5. Update the checkpoint file to 200.
>
> But I'm still trying to clarify how it guarantees EOS, and it seems that we
> would achieve it by enforcing to not persist any data written within this
> transaction until step 4. Is that correct?
>
> > Can you please point me to the place in the codebase where we trigger
> async flush before the commit?
>
> Oh what I meant is not what KStream code does, but that StateStore impl
> classes themselves could potentially flush data to become persisted
> asynchronously, e.g. RocksDB does that naturally out of the control of
> KStream code. I think it is related to my previous question: if we think by
> guaranteeing EOS at the state store level, we would effectively ask the
> impl classes that "you should not persist any data until `flush` is called
> explicitly", is the StateStore interface the right level to enforce such
> mechanisms, or should we just do that on top of the StateStores, e.g.
> during the transaction we just keep all the writes in the cache (of course
> we need to consider how to work around memory pressure as previously
> mentioned), and then upon committing, we just write the cached records as a
> whole into the store and then call flush.
>
>
> Guozhang
>
>
>
>
>
>
>
> On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov
> <asorokou...@confluent.io.invalid> wrote:
>
> > Hey,
> >
> > Thank you for the wealth of great suggestions and questions! I am going
> to
> > address the feedback in batches and update the proposal async, as it is
> > probably going to be easier for everyone. I will also write a separate
> > message after making updates to the KIP.
> >
> > @John,
> >
> > > Did you consider instead just adding the option to the
> > > RocksDB*StoreSupplier classes and the factories in Stores ?
> >
> > Thank you for suggesting that. I think that this idea is better than
> what I
> > came up with and will update the KIP with configuring transactionality
> via
> > the suppliers and Stores.
> >
> > what is the advantage over just doing the same thing with the RecordCache
> > > and not introducing the WriteBatch at all?
> >
> > Can you point me to RecordCache? I can't find it in the project. The
> > advantage would be that WriteBatch guarantees write atomicity. As far as
> I
> > understood the way RecordCache works, it might leave the system in an
> > inconsistent state during crash failure on write.
> >
> > You mentioned that a transactional store can help reduce duplication in
> the
> > > case of ALOS
> >
> > I will remove claims about ALOS from the proposal. Thank you for
> > elaborating!
> >
> > As a reminder, we have a new IQv2 mechanism now. Should we propose any
> > > changes to IQv1 to support this transactional mechanism, versus just
> > > proposing it for IQv2? Certainly, it seems strange only to propose a
> > change
> > > for IQv1 and not v2.
> >
> >
> >  I will update the proposal with complementary API changes for IQv2
> >
> > What should IQ do if I request to readCommitted on a non-transactional
> > > store?
> >
> > We can assume that non-transactional stores commit on write, so IQ works
> in
> > the same way with non-transactional stores regardless of the value of
> > readCommitted.
> >
> >
> >  @Guozhang,
> >
> > * If we crash between line 3 and 4, then at that time the local
> persistent
> > > store image is representing as of offset 200, but upon recovery all
> > > changelog records from 100 to log-end-offset would be considered as
> > aborted
> > > and not be replayed and we would restart processing from position 100.
> > > Restart processing will violate EOS.I'm not sure how e.g. RocksDB's
> > > WriteBatchWithIndex would make sure that the step 4 and step 5 could be
> > > done atomically here.
> >
> >
> > Could you please point me to the place in the codebase where a task
> flushes
> > the store before committing the transaction?
> > Looking at TaskExecutor (
> >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167
> > ),
> > StreamTask#prepareCommit (
> >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398
> > ),
> > and CachedStateStore (
> >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34
> > )
> > we flush the cache, but not the underlying state store. Explicit
> > StateStore#flush happens in AbstractTask#maybeWriteCheckpoint (
> >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
> > ).
> > Is there something I am missing here?
> >
> > Today all cached data that have not been flushed are not committed for
> > > sure, but even flushed data to the persistent underlying store may also
> > be
> > > uncommitted since flushing can be triggered asynchronously before the
> > > commit.
> >
> > Can you please point me to the place in the codebase where we trigger
> async
> > flush before the commit? This would certainly be a reason to introduce a
> > dedicated StateStore#commit method.
> >
> > Thanks again for the feedback. I am going to update the KIP and then
> > respond to the next batch of questions and suggestions.
> >
> > Best,
> > Alex
> >
> > On Mon, May 30, 2022 at 5:13 PM Suhas Satish
> <ssat...@confluent.io.invalid
> > >
> > wrote:
> >
> > > Thanks for the KIP proposal Alex.
> > > 1. Configuration default
> > >
> > > You mention applications using streams DSL with built-in rocksDB state
> > > store will get transactional state stores by default when EOS is
> enabled,
> > > but the default implementation for apps using PAPI will fallback to
> > > non-transactional behavior.
> > > Shouldn't we have the same default behavior for both types of apps -
> DSL
> > > and PAPI?
> > >
> > > On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <cado...@apache.org>
> > wrote:
> > >
> > > > Thanks for the PR, Alex!
> > > >
> > > > I am also glad to see this coming.
> > > >
> > > >
> > > > 1. Configuration
> > > >
> > > > I would also prefer to restrict the configuration of transactional on
> > > > the state sore. Ideally, calling method transactional() on the state
> > > > store would be enough. An option on the store builder would make it
> > > > possible to turn transactionality on and off (as John proposed).
> > > >
> > > >
> > > > 2. Memory usage in RocksDB
> > > >
> > > > This seems to be a major issue. We do not have any guarantee that
> > > > uncommitted writes fit into memory and I guess we will never have.
> What
> > > > happens when the uncommitted writes do not fit into memory? Does
> > RocksDB
> > > > throw an exception? Can we handle such an exception without crashing?
> > > >
> > > > Does the RocksDB behavior even need to be included in this KIP? In
> the
> > > > end it is an implementation detail.
> > > >
> > > > What we should consider - though - is a memory limit in some form.
> And
> > > > what we do when the memory limit is exceeded.
> > > >
> > > >
> > > > 3. PoC
> > > >
> > > > I agree with Guozhang that a PoC is a good idea to better understand
> > the
> > > > devils in the details.
> > > >
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On 25.05.22 01:52, Guozhang Wang wrote:
> > > > > Hello Alex,
> > > > >
> > > > > Thanks for writing the proposal! Glad to see it coming. I think
> this
> > is
> > > > the
> > > > > kind of a KIP that since too many devils would be buried in the
> > details
> > > > and
> > > > > it's better to start working on a POC, either in parallel, or
> before
> > we
> > > > > resume our discussion, rather than blocking any implementation
> until
> > we
> > > > are
> > > > > satisfied with the proposal.
> > > > >
> > > > > Just as a concrete example, I personally am still not 100% clear
> how
> > > the
> > > > > proposal would work to achieve EOS with the state stores. For
> > example,
> > > > the
> > > > > commit procedure today looks like this:
> > > > >
> > > > > 0: there's an existing checkpoint file indicating the changelog
> > offset
> > > of
> > > > > the local state store image is 100. Now a commit is triggered:
> > > > > 1. flush cache (since it contains partially processed records),
> make
> > > sure
> > > > > all records are written to the producer.
> > > > > 2. flush producer, making sure all changelog records have now
> acked.
> > //
> > > > > here we would get the new changelog position, say 200
> > > > > 3. flush store, make sure all writes are persisted.
> > > > > 4. producer.sendOffsetsToTransaction();
> producer.commitTransaction();
> > > //
> > > > we
> > > > > would make the writes in changelog up to offset 200 committed
> > > > > 5. Update the checkpoint file to 200.
> > > > >
> > > > > The question about atomicity between those lines, for example:
> > > > >
> > > > > * If we crash between line 4 and line 5, the local checkpoint file
> > > would
> > > > > stay as 100, and upon recovery we would replay the changelog from
> 100
> > > to
> > > > > 200. This is not ideal but does not violate EOS, since the
> changelogs
> > > are
> > > > > all overwrites anyways.
> > > > > * If we crash between line 3 and 4, then at that time the local
> > > > persistent
> > > > > store image is representing as of offset 200, but upon recovery all
> > > > > changelog records from 100 to log-end-offset would be considered as
> > > > aborted
> > > > > and not be replayed and we would restart processing from position
> > 100.
> > > > > Restart processing will violate EOS.I'm not sure how e.g. RocksDB's
> > > > > WriteBatchWithIndex would make sure that the step 4 and step 5
> could
> > be
> > > > > done atomically here.
> > > > >
> > > > > Originally what I was thinking when creating the JIRA ticket is
> that
> > we
> > > > > need to let the state store to provide a transactional API like
> > "token
> > > > > commit()" used in step 4) above which returns a token, that e.g. in
> > our
> > > > > example above indicates offset 200, and that token would be written
> > as
> > > > part
> > > > > of the records in Kafka transaction in step 5). And upon recovery
> the
> > > > state
> > > > > store would have another API like "rollback(token)" where the token
> > is
> > > > read
> > > > > from the latest committed txn, and be used to rollback the store to
> > > that
> > > > > committed image. I think your proposal is different, and it seems
> > like
> > > > > you're proposing we swap step 3) and 4) above, but the atomicity
> > issue
> > > > > still remains since now you may have the store image at 100 but the
> > > > > changelog is committed at 200. I'd like to learn more about the
> > details
> > > > > on how it resolves such issues.
> > > > >
> > > > > Anyways, that's just an example to make the point that there are
> lots
> > > of
> > > > > implementational details which would drive the public API design,
> and
> > > we
> > > > > should probably first do a POC, and come back to discuss the KIP.
> Let
> > > me
> > > > > know what you think?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, May 24, 2022 at 10:35 AM Sagar <sagarmeansoc...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Hi Alexander,
> > > > >>
> > > > >> Thanks for the KIP! This seems like a great proposal. I have the
> > same
> > > > >> opinion as John on the Configuration part though. I think the 2
> > level
> > > > >> config and its behaviour based on the setting/unsetting of the
> flag
> > > > seems
> > > > >> confusing to me as well. Since the KIP seems specifically centred
> > > around
> > > > >> RocksDB it might be better to add it at the Supplier level as John
> > > > >> suggested.
> > > > >>
> > > > >> On similar lines, this config name =>
> > > > *statestore.transactional.mechanism
> > > > >> *may
> > > > >> also need rethinking as the value assigned to
> it(rocksdb_indexbatch)
> > > > >> implicitly seems to assume that rocksdb is the only statestore
> that
> > > > Kafka
> > > > >> Stream supports while that's not the case.
> > > > >>
> > > > >> Also, regarding the potential memory pressure that can be
> introduced
> > > by
> > > > >> WriteBatchIndex, do you think it might make more sense to include
> > some
> > > > >> numbers/benchmarks on how much the memory consumption might
> > increase?
> > > > >>
> > > > >> Lastly, the read_uncommitted flag's behaviour on IQ may need more
> > > > >> elaboration.
> > > > >>
> > > > >> These points aside, as I said, this is a great proposal!
> > > > >>
> > > > >> Thanks!
> > > > >> Sagar.
> > > > >>
> > > > >> On Tue, May 24, 2022 at 10:35 PM John Roesler <
> vvcep...@apache.org>
> > > > wrote:
> > > > >>
> > > > >>> Thanks for the KIP, Alex!
> > > > >>>
> > > > >>> I'm really happy to see your proposal. This improvement fills a
> > > > >>> long-standing gap.
> > > > >>>
> > > > >>> I have a few questions:
> > > > >>>
> > > > >>> 1. Configuration
> > > > >>> The KIP only mentions RocksDB, but of course, Streams also ships
> > with
> > > > an
> > > > >>> InMemory store, and users also plug in their own custom state
> > stores.
> > > > It
> > > > >> is
> > > > >>> also common to use multiple types of state stores in the same
> > > > application
> > > > >>> for different purposes.
> > > > >>>
> > > > >>> Against this backdrop, the choice to configure transactionality
> as
> > a
> > > > >>> top-level config, as well as to configure the store transaction
> > > > mechanism
> > > > >>> as a top-level config, seems a bit off.
> > > > >>>
> > > > >>> Did you consider instead just adding the option to the
> > > > >>> RocksDB*StoreSupplier classes and the factories in Stores ? It
> > seems
> > > > like
> > > > >>> the desire to enable the feature by default, but with a
> > feature-flag
> > > to
> > > > >>> disable it was a factor here. However, as you pointed out, there
> > are
> > > > some
> > > > >>> major considerations that users should be aware of, so opt-in
> > doesn't
> > > > >> seem
> > > > >>> like a bad choice, either. You could add an Enum argument to
> those
> > > > >>> factories like `RocksDBTransactionalMechanism.{NONE,
> > > > >>>
> > > > >>> Some points in favor of this approach:
> > > > >>> * Avoid "stores that don't support transactions ignore the
> config"
> > > > >>> complexity
> > > > >>> * Users can choose how to spend their memory budget, making some
> > > stores
> > > > >>> transactional and others not
> > > > >>> * When we add transactional support to in-memory stores, we don't
> > > have
> > > > to
> > > > >>> figure out what to do with the mechanism config (i.e., what do
> you
> > > set
> > > > >> the
> > > > >>> mechanism to when there are multiple kinds of transactional
> stores
> > in
> > > > the
> > > > >>> topology?)
> > > > >>>
> > > > >>> 2. caching/flushing/transactions
> > > > >>> The coupling between memory usage and flushing that you mentioned
> > is
> > > a
> > > > >> bit
> > > > >>> troubling. It also occurs to me that there seems to be some
> > > > relationship
> > > > >>> with the existing record cache, which is also an in-memory
> holding
> > > area
> > > > >> for
> > > > >>> records that are not yet written to the cache and/or store
> (albeit
> > > with
> > > > >> no
> > > > >>> particular semantics). Have you considered how all these
> components
> > > > >> should
> > > > >>> relate? For example, should a "full" WriteBatch actually trigger
> a
> > > > flush
> > > > >> so
> > > > >>> that we don't get OOMEs? If the proposed transactional mechanism
> > > forces
> > > > >> all
> > > > >>> uncommitted writes to be buffered in memory, until a commit, then
> > > what
> > > > is
> > > > >>> the advantage over just doing the same thing with the RecordCache
> > and
> > > > not
> > > > >>> introducing the WriteBatch at all?
> > > > >>>
> > > > >>> 3. ALOS
> > > > >>> You mentioned that a transactional store can help reduce
> > duplication
> > > in
> > > > >>> the case of ALOS. We might want to be careful about claims like
> > that.
> > > > >>> Duplication isn't the way that repeated processing manifests in
> > state
> > > > >>> stores. Rather, it is in the form of dirty reads during
> > reprocessing.
> > > > >> This
> > > > >>> feature may reduce the incidence of dirty reads during
> > reprocessing,
> > > > but
> > > > >>> not in a predictable way. During regular processing today, we
> will
> > > send
> > > > >>> some records through to the changelog in between commit
> intervals.
> > > > Under
> > > > >>> ALOS, if any of those dirty writes gets committed to the
> changelog
> > > > topic,
> > > > >>> then upon failure, we have to roll the store forward to them
> > anyway,
> > > > >>> regardless of this new transactional mechanism. That's a fixable
> > > > problem,
> > > > >>> by the way, but this KIP doesn't seem to fix it. I wonder if we
> > > should
> > > > >> make
> > > > >>> any claims about the relationship of this feature to ALOS if the
> > > > >> real-world
> > > > >>> behavior is so complex.
> > > > >>>
> > > > >>> 4. IQ
> > > > >>> As a reminder, we have a new IQv2 mechanism now. Should we
> propose
> > > any
> > > > >>> changes to IQv1 to support this transactional mechanism, versus
> > just
> > > > >>> proposing it for IQv2? Certainly, it seems strange only to
> propose
> > a
> > > > >> change
> > > > >>> for IQv1 and not v2.
> > > > >>>
> > > > >>> Regarding your proposal for IQv1, I'm unsure what the behavior
> > should
> > > > be
> > > > >>> for readCommitted, since the current behavior also reads out of
> the
> > > > >>> RecordCache. I guess if readCommitted==false, then we will
> continue
> > > to
> > > > >> read
> > > > >>> from the cache first, then the Batch, then the store; and if
> > > > >>> readCommitted==true, we would skip the cache and the Batch and
> only
> > > > read
> > > > >>> from the persistent RocksDB store?
> > > > >>>
> > > > >>> What should IQ do if I request to readCommitted on a
> > > non-transactional
> > > > >>> store?
> > > > >>>
> > > > >>> Thanks again for proposing the KIP, and my apologies for the long
> > > > reply;
> > > > >>> I'm hoping to air all my concerns in one "batch" to save time for
> > > you.
> > > > >>>
> > > > >>> Thanks,
> > > > >>> -John
> > > > >>>
> > > > >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote:
> > > > >>>> Hi all,
> > > > >>>>
> > > > >>>> I've written a KIP for making Kafka Streams state stores
> > > transactional
> > > > >>> and
> > > > >>>> would like to start a discussion:
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Alex
> > > > >>>
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > [image: Confluent] <https://www.confluent.io>
> > > Suhas Satish
> > > Engineering Manager
> > > Follow us: [image: Blog]
> > > <
> > >
> >
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> > > >[image:
> > > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> > > <https://www.linkedin.com/company/confluent/>
> > >
> > > [image: Try Confluent Cloud for Free]
> > > <
> > >
> >
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to