Hello Alex,

Thanks for the updated KIP, I looked over it and browsed the WIP and just
have a couple meta thoughts:

1) About the param passed into the `recover()` function: it seems to me
that the semantics of "recover(offset)" is: recover this state to a
transaction boundary which is at least the passed-in offset. And the only
possibility that the returned offset is different than the passed-in offset
is that if the previous failure happens after we've done all the commit
procedures except writing the new checkpoint, in which case the returned
offset would be larger than the passed-in offset. Otherwise it should
always be equal to the passed-in offset, is that right?

2) It seems the only use for the "transactional()" function is to determine
if we can update the checkpoint file while in EOS. But the purpose of the
checkpoint file's offsets is just to tell "the local state's current
snapshot's progress is at least the indicated offsets" anyways, and with
this KIP maybe we would just do:

a) when in ALOS, upon failover: we set the starting offset as
checkpointed-offset, then restore() from changelog till the end-offset.
This way we may restore some records twice.
b) when in EOS, upon failover: we first call recover(checkpointed-offset),
then set the starting offset as the returned offset (which may be larger
than checkpointed-offset), then restore until the end-offset.

So why not also:
c) we let the `commit()` function to also return an offset, which indicates
"checkpointable offsets".
d) for existing non-transactional stores, we just have a default
implementation of "commit()" which is simply a flush, and returns a
sentinel value like -1. Then later if we get checkpointable offsets -1, we
do not write the checkpoint. Upon clean shutting down we can just
checkpoint regardless of the returned value from "commit".
e) for existing non-transactional stores, we just have a default
implementation of "recover()" which is to wipe out the local store and
return offset 0 if the passed in offset is -1, otherwise if not -1 then it
indicates a clean shutdown in the last run, can this function is just a
no-op.

In that case, we would not need the "transactional()" function anymore,
since for non-transactional stores their behaviors are still wrapped in the
`commit / recover` function pairs.

I have not completed the thorough pass on your WIP PR, so maybe I could
come up with some more feedback later, but just let me know if my
understanding above is correct or not?


Guozhang




On Thu, Jul 14, 2022 at 7:01 AM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

> Hi,
>
> I updated the KIP with the following changes:
> * Replaced in-memory batches with the secondary-store approach as the
> default implementation to address the feedback about memory pressure as
> suggested by Sagar and Bruno.
> * Introduced StateStore#commit and StateStore#recover methods as an
> extension of the rollback idea. @Guozhang, please see the comment below on
> why I took a slightly different approach than you suggested.
> * Removed mentions of changes to IQv1 and IQv2. Transactional state stores
> enable reading committed in IQ, but it is really an independent feature
> that deserves its own KIP. Conflating them unnecessarily increases the
> scope for discussion, implementation, and testing in a single unit of work.
>
> I also published a prototype - https://github.com/apache/kafka/pull/12393
> that implements changes described in the proposal.
>
> Regarding explicit rollback, I think it is a powerful idea that allows
> other StateStore implementations to take a different path to the
> transactional behavior rather than keep 2 state stores. Instead of
> introducing a new commit token, I suggest using a changelog offset that
> already 1:1 corresponds to the materialized state. This works nicely
> because Kafka Stream first commits an AK transaction and only then
> checkpoints the state store, so we can use the changelog offset to commit
> the state store transaction.
>
> I called the method StateStore#recover rather than StateStore#rollback
> because a state store might either roll back or forward depending on the
> specific point of the crash failure.Consider the write algorithm in Kafka
> Streams is:
> 1. write stuff to the state store
> 2. producer.sendOffsetsToTransaction(token); producer.commitTransaction();
> 3. flush
> 4. checkpoint
>
> Let's consider 3 cases:
> 1. If the crash failure happens between #2 and #3, the state store rolls
> back and replays the uncommitted transaction from the changelog.
> 2. If the crash failure happens during #3, the state store can roll forward
> and finish the flush/commit.
> 3. If the crash failure happens between #3 and #4, the state store should
> do nothing during recovery and just proceed with the checkpoint.
>
> Looking forward to your feedback,
> Alexander
>
> On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov <
> asorokou...@confluent.io> wrote:
>
> > Hi,
> >
> > As a status update, I did the following changes to the KIP:
> > * replaced configuration via the top-level config with configuration via
> > Stores factory and StoreSuppliers,
> > * added IQv2 and elaborated how readCommitted will work when the store is
> > not transactional,
> > * removed claims about ALOS.
> >
> > I am going to be OOO in the next couple of weeks and will resume working
> > on the proposal and responding to the discussion in this thread starting
> > June 27. My next top priorities are:
> > 1. Prototype the rollback approach as suggested by Guozhang.
> > 2. Replace in-memory batches with the secondary-store approach as the
> > default implementation to address the feedback about memory pressure as
> > suggested by Sagar and Bruno.
> > 3. Adjust Stores methods to make transactional implementations pluggable.
> > 4. Publish the POC for the first review.
> >
> > Best regards,
> > Alex
> >
> > On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> >> Alex,
> >>
> >> Thanks for your replies! That is very helpful.
> >>
> >> Just to broaden our discussions a bit here, I think there are some other
> >> approaches in parallel to the idea of "enforce to only persist upon
> >> explicit flush" and I'd like to throw one here -- not really advocating
> >> it,
> >> but just for us to compare the pros and cons:
> >>
> >> 1) We let the StateStore's `flush` function to return a token instead of
> >> returning `void`.
> >> 2) We add another `rollback(token)` interface of StateStore which would
> >> effectively rollback the state as indicated by the token to the snapshot
> >> when the corresponding `flush` is called.
> >> 3) We encode the token and commit as part of
> >> `producer#sendOffsetsToTransaction`.
> >>
> >> Users could optionally implement the new functions, or they can just not
> >> return the token at all and not implement the second function. Again,
> the
> >> APIs are just for the sake of illustration, not feeling they are the
> most
> >> natural :)
> >>
> >> Then the procedure would be:
> >>
> >> 1. the previous checkpointed offset is 100
> >> ...
> >> 3. flush store, make sure all writes are persisted; get the returned
> token
> >> that indicates the snapshot of 200.
> >> 4. producer.sendOffsetsToTransaction(token);
> producer.commitTransaction();
> >> 5. Update the checkpoint file (say, the new value is 200).
> >>
> >> Then if there's a failure, say between 3/4, we would get the token from
> >> the
> >> last committed txn, and first we would do the restoration (which may get
> >> the state to somewhere between 100 and 200), then call
> >> `store.rollback(token)` to rollback to the snapshot of offset 100.
> >>
> >> The pros is that we would then not need to enforce the state stores to
> not
> >> persist any data during the txn: for stores that may not be able to
> >> implement the `rollback` function, they can still reduce its impl to
> "not
> >> persisting any data" via this API, but for stores that can indeed
> support
> >> the rollback, their implementation may be more efficient. The cons
> though,
> >> on top of my head are 1) more complicated logic differentiating between
> >> EOS
> >> with and without store rollback support, and ALOS, 2) encoding the token
> >> as
> >> part of the commit offset is not ideal if it is big, 3) the recovery
> logic
> >> including the state store is also a bit more complicated.
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >>
> >>
> >> On Wed, Jun 1, 2022 at 1:29 PM Alexander Sorokoumov
> >> <asorokou...@confluent.io.invalid> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> > But I'm still trying to clarify how it guarantees EOS, and it seems
> >> that we
> >> > > would achieve it by enforcing to not persist any data written within
> >> this
> >> > > transaction until step 4. Is that correct?
> >> >
> >> >
> >> > This is correct. Both alternatives - in-memory WriteBatchWithIndex and
> >> > transactionality via the secondary store guarantee EOS by not
> persisting
> >> > data in the "main" state store until it is committed in the changelog
> >> > topic.
> >> >
> >> > Oh what I meant is not what KStream code does, but that StateStore
> impl
> >> > > classes themselves could potentially flush data to become persisted
> >> > > asynchronously
> >> >
> >> >
> >> > Thank you for elaborating! You are correct, the underlying state store
> >> > should not persist data until the streams app calls StateStore#flush.
> >> There
> >> > are 2 options how a State Store implementation can guarantee that -
> >> either
> >> > keep uncommitted writes in memory or be able to roll back the changes
> >> that
> >> > were not committed during recovery. RocksDB's WriteBatchWithIndex is
> an
> >> > implementation of the first option. A considered alternative,
> >> Transactions
> >> > via Secondary State Store for Uncommitted Changes, is the way to
> >> implement
> >> > the second option.
> >> >
> >> > As everyone correctly pointed out, keeping uncommitted data in memory
> >> > introduces a very real risk of OOM that we will need to handle. The
> >> more I
> >> > think about it, the more I lean towards going with the Transactions
> via
> >> > Secondary Store as the way to implement transactionality as it does
> not
> >> > have that issue.
> >> >
> >> > Best,
> >> > Alex
> >> >
> >> >
> >> > On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> >
> >> > > Hello Alex,
> >> > >
> >> > > > we flush the cache, but not the underlying state store.
> >> > >
> >> > > You're right. The ordering I mentioned above is actually:
> >> > >
> >> > > ...
> >> > > 3. producer.sendOffsetsToTransaction();
> producer.commitTransaction();
> >> > > 4. flush store, make sure all writes are persisted.
> >> > > 5. Update the checkpoint file to 200.
> >> > >
> >> > > But I'm still trying to clarify how it guarantees EOS, and it seems
> >> that
> >> > we
> >> > > would achieve it by enforcing to not persist any data written within
> >> this
> >> > > transaction until step 4. Is that correct?
> >> > >
> >> > > > Can you please point me to the place in the codebase where we
> >> trigger
> >> > > async flush before the commit?
> >> > >
> >> > > Oh what I meant is not what KStream code does, but that StateStore
> >> impl
> >> > > classes themselves could potentially flush data to become persisted
> >> > > asynchronously, e.g. RocksDB does that naturally out of the control
> of
> >> > > KStream code. I think it is related to my previous question: if we
> >> think
> >> > by
> >> > > guaranteeing EOS at the state store level, we would effectively ask
> >> the
> >> > > impl classes that "you should not persist any data until `flush` is
> >> > called
> >> > > explicitly", is the StateStore interface the right level to enforce
> >> such
> >> > > mechanisms, or should we just do that on top of the StateStores,
> e.g.
> >> > > during the transaction we just keep all the writes in the cache (of
> >> > course
> >> > > we need to consider how to work around memory pressure as previously
> >> > > mentioned), and then upon committing, we just write the cached
> records
> >> > as a
> >> > > whole into the store and then call flush.
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov
> >> > > <asorokou...@confluent.io.invalid> wrote:
> >> > >
> >> > > > Hey,
> >> > > >
> >> > > > Thank you for the wealth of great suggestions and questions! I am
> >> going
> >> > > to
> >> > > > address the feedback in batches and update the proposal async, as
> >> it is
> >> > > > probably going to be easier for everyone. I will also write a
> >> separate
> >> > > > message after making updates to the KIP.
> >> > > >
> >> > > > @John,
> >> > > >
> >> > > > > Did you consider instead just adding the option to the
> >> > > > > RocksDB*StoreSupplier classes and the factories in Stores ?
> >> > > >
> >> > > > Thank you for suggesting that. I think that this idea is better
> than
> >> > > what I
> >> > > > came up with and will update the KIP with configuring
> >> transactionality
> >> > > via
> >> > > > the suppliers and Stores.
> >> > > >
> >> > > > what is the advantage over just doing the same thing with the
> >> > RecordCache
> >> > > > > and not introducing the WriteBatch at all?
> >> > > >
> >> > > > Can you point me to RecordCache? I can't find it in the project.
> The
> >> > > > advantage would be that WriteBatch guarantees write atomicity. As
> >> far
> >> > as
> >> > > I
> >> > > > understood the way RecordCache works, it might leave the system in
> >> an
> >> > > > inconsistent state during crash failure on write.
> >> > > >
> >> > > > You mentioned that a transactional store can help reduce
> >> duplication in
> >> > > the
> >> > > > > case of ALOS
> >> > > >
> >> > > > I will remove claims about ALOS from the proposal. Thank you for
> >> > > > elaborating!
> >> > > >
> >> > > > As a reminder, we have a new IQv2 mechanism now. Should we propose
> >> any
> >> > > > > changes to IQv1 to support this transactional mechanism, versus
> >> just
> >> > > > > proposing it for IQv2? Certainly, it seems strange only to
> >> propose a
> >> > > > change
> >> > > > > for IQv1 and not v2.
> >> > > >
> >> > > >
> >> > > >  I will update the proposal with complementary API changes for
> IQv2
> >> > > >
> >> > > > What should IQ do if I request to readCommitted on a
> >> non-transactional
> >> > > > > store?
> >> > > >
> >> > > > We can assume that non-transactional stores commit on write, so IQ
> >> > works
> >> > > in
> >> > > > the same way with non-transactional stores regardless of the value
> >> of
> >> > > > readCommitted.
> >> > > >
> >> > > >
> >> > > >  @Guozhang,
> >> > > >
> >> > > > * If we crash between line 3 and 4, then at that time the local
> >> > > persistent
> >> > > > > store image is representing as of offset 200, but upon recovery
> >> all
> >> > > > > changelog records from 100 to log-end-offset would be considered
> >> as
> >> > > > aborted
> >> > > > > and not be replayed and we would restart processing from
> position
> >> > 100.
> >> > > > > Restart processing will violate EOS.I'm not sure how e.g.
> >> RocksDB's
> >> > > > > WriteBatchWithIndex would make sure that the step 4 and step 5
> >> could
> >> > be
> >> > > > > done atomically here.
> >> > > >
> >> > > >
> >> > > > Could you please point me to the place in the codebase where a
> task
> >> > > flushes
> >> > > > the store before committing the transaction?
> >> > > > Looking at TaskExecutor (
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167
> >> > > > ),
> >> > > > StreamTask#prepareCommit (
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398
> >> > > > ),
> >> > > > and CachedStateStore (
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34
> >> > > > )
> >> > > > we flush the cache, but not the underlying state store. Explicit
> >> > > > StateStore#flush happens in AbstractTask#maybeWriteCheckpoint (
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
> >> > > > ).
> >> > > > Is there something I am missing here?
> >> > > >
> >> > > > Today all cached data that have not been flushed are not committed
> >> for
> >> > > > > sure, but even flushed data to the persistent underlying store
> may
> >> > also
> >> > > > be
> >> > > > > uncommitted since flushing can be triggered asynchronously
> before
> >> the
> >> > > > > commit.
> >> > > >
> >> > > > Can you please point me to the place in the codebase where we
> >> trigger
> >> > > async
> >> > > > flush before the commit? This would certainly be a reason to
> >> introduce
> >> > a
> >> > > > dedicated StateStore#commit method.
> >> > > >
> >> > > > Thanks again for the feedback. I am going to update the KIP and
> then
> >> > > > respond to the next batch of questions and suggestions.
> >> > > >
> >> > > > Best,
> >> > > > Alex
> >> > > >
> >> > > > On Mon, May 30, 2022 at 5:13 PM Suhas Satish
> >> > > <ssat...@confluent.io.invalid
> >> > > > >
> >> > > > wrote:
> >> > > >
> >> > > > > Thanks for the KIP proposal Alex.
> >> > > > > 1. Configuration default
> >> > > > >
> >> > > > > You mention applications using streams DSL with built-in rocksDB
> >> > state
> >> > > > > store will get transactional state stores by default when EOS is
> >> > > enabled,
> >> > > > > but the default implementation for apps using PAPI will fallback
> >> to
> >> > > > > non-transactional behavior.
> >> > > > > Shouldn't we have the same default behavior for both types of
> >> apps -
> >> > > DSL
> >> > > > > and PAPI?
> >> > > > >
> >> > > > > On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <
> cado...@apache.org
> >> >
> >> > > > wrote:
> >> > > > >
> >> > > > > > Thanks for the PR, Alex!
> >> > > > > >
> >> > > > > > I am also glad to see this coming.
> >> > > > > >
> >> > > > > >
> >> > > > > > 1. Configuration
> >> > > > > >
> >> > > > > > I would also prefer to restrict the configuration of
> >> transactional
> >> > on
> >> > > > > > the state sore. Ideally, calling method transactional() on the
> >> > state
> >> > > > > > store would be enough. An option on the store builder would
> >> make it
> >> > > > > > possible to turn transactionality on and off (as John
> proposed).
> >> > > > > >
> >> > > > > >
> >> > > > > > 2. Memory usage in RocksDB
> >> > > > > >
> >> > > > > > This seems to be a major issue. We do not have any guarantee
> >> that
> >> > > > > > uncommitted writes fit into memory and I guess we will never
> >> have.
> >> > > What
> >> > > > > > happens when the uncommitted writes do not fit into memory?
> Does
> >> > > > RocksDB
> >> > > > > > throw an exception? Can we handle such an exception without
> >> > crashing?
> >> > > > > >
> >> > > > > > Does the RocksDB behavior even need to be included in this
> KIP?
> >> In
> >> > > the
> >> > > > > > end it is an implementation detail.
> >> > > > > >
> >> > > > > > What we should consider - though - is a memory limit in some
> >> form.
> >> > > And
> >> > > > > > what we do when the memory limit is exceeded.
> >> > > > > >
> >> > > > > >
> >> > > > > > 3. PoC
> >> > > > > >
> >> > > > > > I agree with Guozhang that a PoC is a good idea to better
> >> > understand
> >> > > > the
> >> > > > > > devils in the details.
> >> > > > > >
> >> > > > > >
> >> > > > > > Best,
> >> > > > > > Bruno
> >> > > > > >
> >> > > > > > On 25.05.22 01:52, Guozhang Wang wrote:
> >> > > > > > > Hello Alex,
> >> > > > > > >
> >> > > > > > > Thanks for writing the proposal! Glad to see it coming. I
> >> think
> >> > > this
> >> > > > is
> >> > > > > > the
> >> > > > > > > kind of a KIP that since too many devils would be buried in
> >> the
> >> > > > details
> >> > > > > > and
> >> > > > > > > it's better to start working on a POC, either in parallel,
> or
> >> > > before
> >> > > > we
> >> > > > > > > resume our discussion, rather than blocking any
> implementation
> >> > > until
> >> > > > we
> >> > > > > > are
> >> > > > > > > satisfied with the proposal.
> >> > > > > > >
> >> > > > > > > Just as a concrete example, I personally am still not 100%
> >> clear
> >> > > how
> >> > > > > the
> >> > > > > > > proposal would work to achieve EOS with the state stores.
> For
> >> > > > example,
> >> > > > > > the
> >> > > > > > > commit procedure today looks like this:
> >> > > > > > >
> >> > > > > > > 0: there's an existing checkpoint file indicating the
> >> changelog
> >> > > > offset
> >> > > > > of
> >> > > > > > > the local state store image is 100. Now a commit is
> triggered:
> >> > > > > > > 1. flush cache (since it contains partially processed
> >> records),
> >> > > make
> >> > > > > sure
> >> > > > > > > all records are written to the producer.
> >> > > > > > > 2. flush producer, making sure all changelog records have
> now
> >> > > acked.
> >> > > > //
> >> > > > > > > here we would get the new changelog position, say 200
> >> > > > > > > 3. flush store, make sure all writes are persisted.
> >> > > > > > > 4. producer.sendOffsetsToTransaction();
> >> > > producer.commitTransaction();
> >> > > > > //
> >> > > > > > we
> >> > > > > > > would make the writes in changelog up to offset 200
> committed
> >> > > > > > > 5. Update the checkpoint file to 200.
> >> > > > > > >
> >> > > > > > > The question about atomicity between those lines, for
> example:
> >> > > > > > >
> >> > > > > > > * If we crash between line 4 and line 5, the local
> checkpoint
> >> > file
> >> > > > > would
> >> > > > > > > stay as 100, and upon recovery we would replay the changelog
> >> from
> >> > > 100
> >> > > > > to
> >> > > > > > > 200. This is not ideal but does not violate EOS, since the
> >> > > changelogs
> >> > > > > are
> >> > > > > > > all overwrites anyways.
> >> > > > > > > * If we crash between line 3 and 4, then at that time the
> >> local
> >> > > > > > persistent
> >> > > > > > > store image is representing as of offset 200, but upon
> >> recovery
> >> > all
> >> > > > > > > changelog records from 100 to log-end-offset would be
> >> considered
> >> > as
> >> > > > > > aborted
> >> > > > > > > and not be replayed and we would restart processing from
> >> position
> >> > > > 100.
> >> > > > > > > Restart processing will violate EOS.I'm not sure how e.g.
> >> > RocksDB's
> >> > > > > > > WriteBatchWithIndex would make sure that the step 4 and
> step 5
> >> > > could
> >> > > > be
> >> > > > > > > done atomically here.
> >> > > > > > >
> >> > > > > > > Originally what I was thinking when creating the JIRA ticket
> >> is
> >> > > that
> >> > > > we
> >> > > > > > > need to let the state store to provide a transactional API
> >> like
> >> > > > "token
> >> > > > > > > commit()" used in step 4) above which returns a token, that
> >> e.g.
> >> > in
> >> > > > our
> >> > > > > > > example above indicates offset 200, and that token would be
> >> > written
> >> > > > as
> >> > > > > > part
> >> > > > > > > of the records in Kafka transaction in step 5). And upon
> >> recovery
> >> > > the
> >> > > > > > state
> >> > > > > > > store would have another API like "rollback(token)" where
> the
> >> > token
> >> > > > is
> >> > > > > > read
> >> > > > > > > from the latest committed txn, and be used to rollback the
> >> store
> >> > to
> >> > > > > that
> >> > > > > > > committed image. I think your proposal is different, and it
> >> seems
> >> > > > like
> >> > > > > > > you're proposing we swap step 3) and 4) above, but the
> >> atomicity
> >> > > > issue
> >> > > > > > > still remains since now you may have the store image at 100
> >> but
> >> > the
> >> > > > > > > changelog is committed at 200. I'd like to learn more about
> >> the
> >> > > > details
> >> > > > > > > on how it resolves such issues.
> >> > > > > > >
> >> > > > > > > Anyways, that's just an example to make the point that there
> >> are
> >> > > lots
> >> > > > > of
> >> > > > > > > implementational details which would drive the public API
> >> design,
> >> > > and
> >> > > > > we
> >> > > > > > > should probably first do a POC, and come back to discuss the
> >> KIP.
> >> > > Let
> >> > > > > me
> >> > > > > > > know what you think?
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Guozhang
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Tue, May 24, 2022 at 10:35 AM Sagar <
> >> > sagarmeansoc...@gmail.com>
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hi Alexander,
> >> > > > > > >>
> >> > > > > > >> Thanks for the KIP! This seems like a great proposal. I
> have
> >> the
> >> > > > same
> >> > > > > > >> opinion as John on the Configuration part though. I think
> >> the 2
> >> > > > level
> >> > > > > > >> config and its behaviour based on the setting/unsetting of
> >> the
> >> > > flag
> >> > > > > > seems
> >> > > > > > >> confusing to me as well. Since the KIP seems specifically
> >> > centred
> >> > > > > around
> >> > > > > > >> RocksDB it might be better to add it at the Supplier level
> as
> >> > John
> >> > > > > > >> suggested.
> >> > > > > > >>
> >> > > > > > >> On similar lines, this config name =>
> >> > > > > > *statestore.transactional.mechanism
> >> > > > > > >> *may
> >> > > > > > >> also need rethinking as the value assigned to
> >> > > it(rocksdb_indexbatch)
> >> > > > > > >> implicitly seems to assume that rocksdb is the only
> >> statestore
> >> > > that
> >> > > > > > Kafka
> >> > > > > > >> Stream supports while that's not the case.
> >> > > > > > >>
> >> > > > > > >> Also, regarding the potential memory pressure that can be
> >> > > introduced
> >> > > > > by
> >> > > > > > >> WriteBatchIndex, do you think it might make more sense to
> >> > include
> >> > > > some
> >> > > > > > >> numbers/benchmarks on how much the memory consumption might
> >> > > > increase?
> >> > > > > > >>
> >> > > > > > >> Lastly, the read_uncommitted flag's behaviour on IQ may
> need
> >> > more
> >> > > > > > >> elaboration.
> >> > > > > > >>
> >> > > > > > >> These points aside, as I said, this is a great proposal!
> >> > > > > > >>
> >> > > > > > >> Thanks!
> >> > > > > > >> Sagar.
> >> > > > > > >>
> >> > > > > > >> On Tue, May 24, 2022 at 10:35 PM John Roesler <
> >> > > vvcep...@apache.org>
> >> > > > > > wrote:
> >> > > > > > >>
> >> > > > > > >>> Thanks for the KIP, Alex!
> >> > > > > > >>>
> >> > > > > > >>> I'm really happy to see your proposal. This improvement
> >> fills a
> >> > > > > > >>> long-standing gap.
> >> > > > > > >>>
> >> > > > > > >>> I have a few questions:
> >> > > > > > >>>
> >> > > > > > >>> 1. Configuration
> >> > > > > > >>> The KIP only mentions RocksDB, but of course, Streams also
> >> > ships
> >> > > > with
> >> > > > > > an
> >> > > > > > >>> InMemory store, and users also plug in their own custom
> >> state
> >> > > > stores.
> >> > > > > > It
> >> > > > > > >> is
> >> > > > > > >>> also common to use multiple types of state stores in the
> >> same
> >> > > > > > application
> >> > > > > > >>> for different purposes.
> >> > > > > > >>>
> >> > > > > > >>> Against this backdrop, the choice to configure
> >> transactionality
> >> > > as
> >> > > > a
> >> > > > > > >>> top-level config, as well as to configure the store
> >> transaction
> >> > > > > > mechanism
> >> > > > > > >>> as a top-level config, seems a bit off.
> >> > > > > > >>>
> >> > > > > > >>> Did you consider instead just adding the option to the
> >> > > > > > >>> RocksDB*StoreSupplier classes and the factories in Stores
> ?
> >> It
> >> > > > seems
> >> > > > > > like
> >> > > > > > >>> the desire to enable the feature by default, but with a
> >> > > > feature-flag
> >> > > > > to
> >> > > > > > >>> disable it was a factor here. However, as you pointed out,
> >> > there
> >> > > > are
> >> > > > > > some
> >> > > > > > >>> major considerations that users should be aware of, so
> >> opt-in
> >> > > > doesn't
> >> > > > > > >> seem
> >> > > > > > >>> like a bad choice, either. You could add an Enum argument
> to
> >> > > those
> >> > > > > > >>> factories like `RocksDBTransactionalMechanism.{NONE,
> >> > > > > > >>>
> >> > > > > > >>> Some points in favor of this approach:
> >> > > > > > >>> * Avoid "stores that don't support transactions ignore the
> >> > > config"
> >> > > > > > >>> complexity
> >> > > > > > >>> * Users can choose how to spend their memory budget,
> making
> >> > some
> >> > > > > stores
> >> > > > > > >>> transactional and others not
> >> > > > > > >>> * When we add transactional support to in-memory stores,
> we
> >> > don't
> >> > > > > have
> >> > > > > > to
> >> > > > > > >>> figure out what to do with the mechanism config (i.e.,
> what
> >> do
> >> > > you
> >> > > > > set
> >> > > > > > >> the
> >> > > > > > >>> mechanism to when there are multiple kinds of
> transactional
> >> > > stores
> >> > > > in
> >> > > > > > the
> >> > > > > > >>> topology?)
> >> > > > > > >>>
> >> > > > > > >>> 2. caching/flushing/transactions
> >> > > > > > >>> The coupling between memory usage and flushing that you
> >> > mentioned
> >> > > > is
> >> > > > > a
> >> > > > > > >> bit
> >> > > > > > >>> troubling. It also occurs to me that there seems to be
> some
> >> > > > > > relationship
> >> > > > > > >>> with the existing record cache, which is also an in-memory
> >> > > holding
> >> > > > > area
> >> > > > > > >> for
> >> > > > > > >>> records that are not yet written to the cache and/or store
> >> > > (albeit
> >> > > > > with
> >> > > > > > >> no
> >> > > > > > >>> particular semantics). Have you considered how all these
> >> > > components
> >> > > > > > >> should
> >> > > > > > >>> relate? For example, should a "full" WriteBatch actually
> >> > trigger
> >> > > a
> >> > > > > > flush
> >> > > > > > >> so
> >> > > > > > >>> that we don't get OOMEs? If the proposed transactional
> >> > mechanism
> >> > > > > forces
> >> > > > > > >> all
> >> > > > > > >>> uncommitted writes to be buffered in memory, until a
> commit,
> >> > then
> >> > > > > what
> >> > > > > > is
> >> > > > > > >>> the advantage over just doing the same thing with the
> >> > RecordCache
> >> > > > and
> >> > > > > > not
> >> > > > > > >>> introducing the WriteBatch at all?
> >> > > > > > >>>
> >> > > > > > >>> 3. ALOS
> >> > > > > > >>> You mentioned that a transactional store can help reduce
> >> > > > duplication
> >> > > > > in
> >> > > > > > >>> the case of ALOS. We might want to be careful about claims
> >> like
> >> > > > that.
> >> > > > > > >>> Duplication isn't the way that repeated processing
> >> manifests in
> >> > > > state
> >> > > > > > >>> stores. Rather, it is in the form of dirty reads during
> >> > > > reprocessing.
> >> > > > > > >> This
> >> > > > > > >>> feature may reduce the incidence of dirty reads during
> >> > > > reprocessing,
> >> > > > > > but
> >> > > > > > >>> not in a predictable way. During regular processing today,
> >> we
> >> > > will
> >> > > > > send
> >> > > > > > >>> some records through to the changelog in between commit
> >> > > intervals.
> >> > > > > > Under
> >> > > > > > >>> ALOS, if any of those dirty writes gets committed to the
> >> > > changelog
> >> > > > > > topic,
> >> > > > > > >>> then upon failure, we have to roll the store forward to
> them
> >> > > > anyway,
> >> > > > > > >>> regardless of this new transactional mechanism. That's a
> >> > fixable
> >> > > > > > problem,
> >> > > > > > >>> by the way, but this KIP doesn't seem to fix it. I wonder
> >> if we
> >> > > > > should
> >> > > > > > >> make
> >> > > > > > >>> any claims about the relationship of this feature to ALOS
> if
> >> > the
> >> > > > > > >> real-world
> >> > > > > > >>> behavior is so complex.
> >> > > > > > >>>
> >> > > > > > >>> 4. IQ
> >> > > > > > >>> As a reminder, we have a new IQv2 mechanism now. Should we
> >> > > propose
> >> > > > > any
> >> > > > > > >>> changes to IQv1 to support this transactional mechanism,
> >> versus
> >> > > > just
> >> > > > > > >>> proposing it for IQv2? Certainly, it seems strange only to
> >> > > propose
> >> > > > a
> >> > > > > > >> change
> >> > > > > > >>> for IQv1 and not v2.
> >> > > > > > >>>
> >> > > > > > >>> Regarding your proposal for IQv1, I'm unsure what the
> >> behavior
> >> > > > should
> >> > > > > > be
> >> > > > > > >>> for readCommitted, since the current behavior also reads
> >> out of
> >> > > the
> >> > > > > > >>> RecordCache. I guess if readCommitted==false, then we will
> >> > > continue
> >> > > > > to
> >> > > > > > >> read
> >> > > > > > >>> from the cache first, then the Batch, then the store; and
> if
> >> > > > > > >>> readCommitted==true, we would skip the cache and the Batch
> >> and
> >> > > only
> >> > > > > > read
> >> > > > > > >>> from the persistent RocksDB store?
> >> > > > > > >>>
> >> > > > > > >>> What should IQ do if I request to readCommitted on a
> >> > > > > non-transactional
> >> > > > > > >>> store?
> >> > > > > > >>>
> >> > > > > > >>> Thanks again for proposing the KIP, and my apologies for
> the
> >> > long
> >> > > > > > reply;
> >> > > > > > >>> I'm hoping to air all my concerns in one "batch" to save
> >> time
> >> > for
> >> > > > > you.
> >> > > > > > >>>
> >> > > > > > >>> Thanks,
> >> > > > > > >>> -John
> >> > > > > > >>>
> >> > > > > > >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov
> wrote:
> >> > > > > > >>>> Hi all,
> >> > > > > > >>>>
> >> > > > > > >>>> I've written a KIP for making Kafka Streams state stores
> >> > > > > transactional
> >> > > > > > >>> and
> >> > > > > > >>>> would like to start a discussion:
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> >> > > > > > >>>>
> >> > > > > > >>>> Best,
> >> > > > > > >>>> Alex
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > >
> >> > > > > [image: Confluent] <https://www.confluent.io>
> >> > > > > Suhas Satish
> >> > > > > Engineering Manager
> >> > > > > Follow us: [image: Blog]
> >> > > > > <
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> >> > > > > >[image:
> >> > > > > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> >> > > > > <https://www.linkedin.com/company/confluent/>
> >> > > > >
> >> > > > > [image: Try Confluent Cloud for Free]
> >> > > > > <
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>


-- 
-- Guozhang

Reply via email to