Thanks for the KIP proposal Alex.
1. Configuration default

You mention applications using streams DSL with built-in rocksDB state
store will get transactional state stores by default when EOS is enabled,
but the default implementation for apps using PAPI will fallback to
non-transactional behavior.
Shouldn't we have the same default behavior for both types of apps - DSL
and PAPI?

On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <cado...@apache.org> wrote:

> Thanks for the PR, Alex!
>
> I am also glad to see this coming.
>
>
> 1. Configuration
>
> I would also prefer to restrict the configuration of transactional on
> the state sore. Ideally, calling method transactional() on the state
> store would be enough. An option on the store builder would make it
> possible to turn transactionality on and off (as John proposed).
>
>
> 2. Memory usage in RocksDB
>
> This seems to be a major issue. We do not have any guarantee that
> uncommitted writes fit into memory and I guess we will never have. What
> happens when the uncommitted writes do not fit into memory? Does RocksDB
> throw an exception? Can we handle such an exception without crashing?
>
> Does the RocksDB behavior even need to be included in this KIP? In the
> end it is an implementation detail.
>
> What we should consider - though - is a memory limit in some form. And
> what we do when the memory limit is exceeded.
>
>
> 3. PoC
>
> I agree with Guozhang that a PoC is a good idea to better understand the
> devils in the details.
>
>
> Best,
> Bruno
>
> On 25.05.22 01:52, Guozhang Wang wrote:
> > Hello Alex,
> >
> > Thanks for writing the proposal! Glad to see it coming. I think this is
> the
> > kind of a KIP that since too many devils would be buried in the details
> and
> > it's better to start working on a POC, either in parallel, or before we
> > resume our discussion, rather than blocking any implementation until we
> are
> > satisfied with the proposal.
> >
> > Just as a concrete example, I personally am still not 100% clear how the
> > proposal would work to achieve EOS with the state stores. For example,
> the
> > commit procedure today looks like this:
> >
> > 0: there's an existing checkpoint file indicating the changelog offset of
> > the local state store image is 100. Now a commit is triggered:
> > 1. flush cache (since it contains partially processed records), make sure
> > all records are written to the producer.
> > 2. flush producer, making sure all changelog records have now acked. //
> > here we would get the new changelog position, say 200
> > 3. flush store, make sure all writes are persisted.
> > 4. producer.sendOffsetsToTransaction(); producer.commitTransaction(); //
> we
> > would make the writes in changelog up to offset 200 committed
> > 5. Update the checkpoint file to 200.
> >
> > The question about atomicity between those lines, for example:
> >
> > * If we crash between line 4 and line 5, the local checkpoint file would
> > stay as 100, and upon recovery we would replay the changelog from 100 to
> > 200. This is not ideal but does not violate EOS, since the changelogs are
> > all overwrites anyways.
> > * If we crash between line 3 and 4, then at that time the local
> persistent
> > store image is representing as of offset 200, but upon recovery all
> > changelog records from 100 to log-end-offset would be considered as
> aborted
> > and not be replayed and we would restart processing from position 100.
> > Restart processing will violate EOS.I'm not sure how e.g. RocksDB's
> > WriteBatchWithIndex would make sure that the step 4 and step 5 could be
> > done atomically here.
> >
> > Originally what I was thinking when creating the JIRA ticket is that we
> > need to let the state store to provide a transactional API like "token
> > commit()" used in step 4) above which returns a token, that e.g. in our
> > example above indicates offset 200, and that token would be written as
> part
> > of the records in Kafka transaction in step 5). And upon recovery the
> state
> > store would have another API like "rollback(token)" where the token is
> read
> > from the latest committed txn, and be used to rollback the store to that
> > committed image. I think your proposal is different, and it seems like
> > you're proposing we swap step 3) and 4) above, but the atomicity issue
> > still remains since now you may have the store image at 100 but the
> > changelog is committed at 200. I'd like to learn more about the details
> > on how it resolves such issues.
> >
> > Anyways, that's just an example to make the point that there are lots of
> > implementational details which would drive the public API design, and we
> > should probably first do a POC, and come back to discuss the KIP. Let me
> > know what you think?
> >
> >
> > Guozhang
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Tue, May 24, 2022 at 10:35 AM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> >
> >> Hi Alexander,
> >>
> >> Thanks for the KIP! This seems like a great proposal. I have the same
> >> opinion as John on the Configuration part though. I think the 2 level
> >> config and its behaviour based on the setting/unsetting of the flag
> seems
> >> confusing to me as well. Since the KIP seems specifically centred around
> >> RocksDB it might be better to add it at the Supplier level as John
> >> suggested.
> >>
> >> On similar lines, this config name =>
> *statestore.transactional.mechanism
> >> *may
> >> also need rethinking as the value assigned to it(rocksdb_indexbatch)
> >> implicitly seems to assume that rocksdb is the only statestore that
> Kafka
> >> Stream supports while that's not the case.
> >>
> >> Also, regarding the potential memory pressure that can be introduced by
> >> WriteBatchIndex, do you think it might make more sense to include some
> >> numbers/benchmarks on how much the memory consumption might increase?
> >>
> >> Lastly, the read_uncommitted flag's behaviour on IQ may need more
> >> elaboration.
> >>
> >> These points aside, as I said, this is a great proposal!
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Tue, May 24, 2022 at 10:35 PM John Roesler <vvcep...@apache.org>
> wrote:
> >>
> >>> Thanks for the KIP, Alex!
> >>>
> >>> I'm really happy to see your proposal. This improvement fills a
> >>> long-standing gap.
> >>>
> >>> I have a few questions:
> >>>
> >>> 1. Configuration
> >>> The KIP only mentions RocksDB, but of course, Streams also ships with
> an
> >>> InMemory store, and users also plug in their own custom state stores.
> It
> >> is
> >>> also common to use multiple types of state stores in the same
> application
> >>> for different purposes.
> >>>
> >>> Against this backdrop, the choice to configure transactionality as a
> >>> top-level config, as well as to configure the store transaction
> mechanism
> >>> as a top-level config, seems a bit off.
> >>>
> >>> Did you consider instead just adding the option to the
> >>> RocksDB*StoreSupplier classes and the factories in Stores ? It seems
> like
> >>> the desire to enable the feature by default, but with a feature-flag to
> >>> disable it was a factor here. However, as you pointed out, there are
> some
> >>> major considerations that users should be aware of, so opt-in doesn't
> >> seem
> >>> like a bad choice, either. You could add an Enum argument to those
> >>> factories like `RocksDBTransactionalMechanism.{NONE,
> >>>
> >>> Some points in favor of this approach:
> >>> * Avoid "stores that don't support transactions ignore the config"
> >>> complexity
> >>> * Users can choose how to spend their memory budget, making some stores
> >>> transactional and others not
> >>> * When we add transactional support to in-memory stores, we don't have
> to
> >>> figure out what to do with the mechanism config (i.e., what do you set
> >> the
> >>> mechanism to when there are multiple kinds of transactional stores in
> the
> >>> topology?)
> >>>
> >>> 2. caching/flushing/transactions
> >>> The coupling between memory usage and flushing that you mentioned is a
> >> bit
> >>> troubling. It also occurs to me that there seems to be some
> relationship
> >>> with the existing record cache, which is also an in-memory holding area
> >> for
> >>> records that are not yet written to the cache and/or store (albeit with
> >> no
> >>> particular semantics). Have you considered how all these components
> >> should
> >>> relate? For example, should a "full" WriteBatch actually trigger a
> flush
> >> so
> >>> that we don't get OOMEs? If the proposed transactional mechanism forces
> >> all
> >>> uncommitted writes to be buffered in memory, until a commit, then what
> is
> >>> the advantage over just doing the same thing with the RecordCache and
> not
> >>> introducing the WriteBatch at all?
> >>>
> >>> 3. ALOS
> >>> You mentioned that a transactional store can help reduce duplication in
> >>> the case of ALOS. We might want to be careful about claims like that.
> >>> Duplication isn't the way that repeated processing manifests in state
> >>> stores. Rather, it is in the form of dirty reads during reprocessing.
> >> This
> >>> feature may reduce the incidence of dirty reads during reprocessing,
> but
> >>> not in a predictable way. During regular processing today, we will send
> >>> some records through to the changelog in between commit intervals.
> Under
> >>> ALOS, if any of those dirty writes gets committed to the changelog
> topic,
> >>> then upon failure, we have to roll the store forward to them anyway,
> >>> regardless of this new transactional mechanism. That's a fixable
> problem,
> >>> by the way, but this KIP doesn't seem to fix it. I wonder if we should
> >> make
> >>> any claims about the relationship of this feature to ALOS if the
> >> real-world
> >>> behavior is so complex.
> >>>
> >>> 4. IQ
> >>> As a reminder, we have a new IQv2 mechanism now. Should we propose any
> >>> changes to IQv1 to support this transactional mechanism, versus just
> >>> proposing it for IQv2? Certainly, it seems strange only to propose a
> >> change
> >>> for IQv1 and not v2.
> >>>
> >>> Regarding your proposal for IQv1, I'm unsure what the behavior should
> be
> >>> for readCommitted, since the current behavior also reads out of the
> >>> RecordCache. I guess if readCommitted==false, then we will continue to
> >> read
> >>> from the cache first, then the Batch, then the store; and if
> >>> readCommitted==true, we would skip the cache and the Batch and only
> read
> >>> from the persistent RocksDB store?
> >>>
> >>> What should IQ do if I request to readCommitted on a non-transactional
> >>> store?
> >>>
> >>> Thanks again for proposing the KIP, and my apologies for the long
> reply;
> >>> I'm hoping to air all my concerns in one "batch" to save time for you.
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote:
> >>>> Hi all,
> >>>>
> >>>> I've written a KIP for making Kafka Streams state stores transactional
> >>> and
> >>>> would like to start a discussion:
> >>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> >>>>
> >>>> Best,
> >>>> Alex
> >>>
> >>
> >
> >
>


-- 

[image: Confluent] <https://www.confluent.io>
Suhas Satish
Engineering Manager
Follow us: [image: Blog]
<https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>[image:
Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/company/confluent/>

[image: Try Confluent Cloud for Free]
<https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic>

Reply via email to