Hello Alex,

Thanks for writing the proposal! Glad to see it coming. I think this is the
kind of a KIP that since too many devils would be buried in the details and
it's better to start working on a POC, either in parallel, or before we
resume our discussion, rather than blocking any implementation until we are
satisfied with the proposal.

Just as a concrete example, I personally am still not 100% clear how the
proposal would work to achieve EOS with the state stores. For example, the
commit procedure today looks like this:

0: there's an existing checkpoint file indicating the changelog offset of
the local state store image is 100. Now a commit is triggered:
1. flush cache (since it contains partially processed records), make sure
all records are written to the producer.
2. flush producer, making sure all changelog records have now acked. //
here we would get the new changelog position, say 200
3. flush store, make sure all writes are persisted.
4. producer.sendOffsetsToTransaction(); producer.commitTransaction(); // we
would make the writes in changelog up to offset 200 committed
5. Update the checkpoint file to 200.

The question about atomicity between those lines, for example:

* If we crash between line 4 and line 5, the local checkpoint file would
stay as 100, and upon recovery we would replay the changelog from 100 to
200. This is not ideal but does not violate EOS, since the changelogs are
all overwrites anyways.
* If we crash between line 3 and 4, then at that time the local persistent
store image is representing as of offset 200, but upon recovery all
changelog records from 100 to log-end-offset would be considered as aborted
and not be replayed and we would restart processing from position 100.
Restart processing will violate EOS.I'm not sure how e.g. RocksDB's
WriteBatchWithIndex would make sure that the step 4 and step 5 could be
done atomically here.

Originally what I was thinking when creating the JIRA ticket is that we
need to let the state store to provide a transactional API like "token
commit()" used in step 4) above which returns a token, that e.g. in our
example above indicates offset 200, and that token would be written as part
of the records in Kafka transaction in step 5). And upon recovery the state
store would have another API like "rollback(token)" where the token is read
from the latest committed txn, and be used to rollback the store to that
committed image. I think your proposal is different, and it seems like
you're proposing we swap step 3) and 4) above, but the atomicity issue
still remains since now you may have the store image at 100 but the
changelog is committed at 200. I'd like to learn more about the details
on how it resolves such issues.

Anyways, that's just an example to make the point that there are lots of
implementational details which would drive the public API design, and we
should probably first do a POC, and come back to discuss the KIP. Let me
know what you think?


Guozhang









On Tue, May 24, 2022 at 10:35 AM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi Alexander,
>
> Thanks for the KIP! This seems like a great proposal. I have the same
> opinion as John on the Configuration part though. I think the 2 level
> config and its behaviour based on the setting/unsetting of the flag seems
> confusing to me as well. Since the KIP seems specifically centred around
> RocksDB it might be better to add it at the Supplier level as John
> suggested.
>
> On similar lines, this config name => *statestore.transactional.mechanism
> *may
> also need rethinking as the value assigned to it(rocksdb_indexbatch)
> implicitly seems to assume that rocksdb is the only statestore that Kafka
> Stream supports while that's not the case.
>
> Also, regarding the potential memory pressure that can be introduced by
> WriteBatchIndex, do you think it might make more sense to include some
> numbers/benchmarks on how much the memory consumption might increase?
>
> Lastly, the read_uncommitted flag's behaviour on IQ may need more
> elaboration.
>
> These points aside, as I said, this is a great proposal!
>
> Thanks!
> Sagar.
>
> On Tue, May 24, 2022 at 10:35 PM John Roesler <vvcep...@apache.org> wrote:
>
> > Thanks for the KIP, Alex!
> >
> > I'm really happy to see your proposal. This improvement fills a
> > long-standing gap.
> >
> > I have a few questions:
> >
> > 1. Configuration
> > The KIP only mentions RocksDB, but of course, Streams also ships with an
> > InMemory store, and users also plug in their own custom state stores. It
> is
> > also common to use multiple types of state stores in the same application
> > for different purposes.
> >
> > Against this backdrop, the choice to configure transactionality as a
> > top-level config, as well as to configure the store transaction mechanism
> > as a top-level config, seems a bit off.
> >
> > Did you consider instead just adding the option to the
> > RocksDB*StoreSupplier classes and the factories in Stores ? It seems like
> > the desire to enable the feature by default, but with a feature-flag to
> > disable it was a factor here. However, as you pointed out, there are some
> > major considerations that users should be aware of, so opt-in doesn't
> seem
> > like a bad choice, either. You could add an Enum argument to those
> > factories like `RocksDBTransactionalMechanism.{NONE,
> >
> > Some points in favor of this approach:
> > * Avoid "stores that don't support transactions ignore the config"
> > complexity
> > * Users can choose how to spend their memory budget, making some stores
> > transactional and others not
> > * When we add transactional support to in-memory stores, we don't have to
> > figure out what to do with the mechanism config (i.e., what do you set
> the
> > mechanism to when there are multiple kinds of transactional stores in the
> > topology?)
> >
> > 2. caching/flushing/transactions
> > The coupling between memory usage and flushing that you mentioned is a
> bit
> > troubling. It also occurs to me that there seems to be some relationship
> > with the existing record cache, which is also an in-memory holding area
> for
> > records that are not yet written to the cache and/or store (albeit with
> no
> > particular semantics). Have you considered how all these components
> should
> > relate? For example, should a "full" WriteBatch actually trigger a flush
> so
> > that we don't get OOMEs? If the proposed transactional mechanism forces
> all
> > uncommitted writes to be buffered in memory, until a commit, then what is
> > the advantage over just doing the same thing with the RecordCache and not
> > introducing the WriteBatch at all?
> >
> > 3. ALOS
> > You mentioned that a transactional store can help reduce duplication in
> > the case of ALOS. We might want to be careful about claims like that.
> > Duplication isn't the way that repeated processing manifests in state
> > stores. Rather, it is in the form of dirty reads during reprocessing.
> This
> > feature may reduce the incidence of dirty reads during reprocessing, but
> > not in a predictable way. During regular processing today, we will send
> > some records through to the changelog in between commit intervals. Under
> > ALOS, if any of those dirty writes gets committed to the changelog topic,
> > then upon failure, we have to roll the store forward to them anyway,
> > regardless of this new transactional mechanism. That's a fixable problem,
> > by the way, but this KIP doesn't seem to fix it. I wonder if we should
> make
> > any claims about the relationship of this feature to ALOS if the
> real-world
> > behavior is so complex.
> >
> > 4. IQ
> > As a reminder, we have a new IQv2 mechanism now. Should we propose any
> > changes to IQv1 to support this transactional mechanism, versus just
> > proposing it for IQv2? Certainly, it seems strange only to propose a
> change
> > for IQv1 and not v2.
> >
> > Regarding your proposal for IQv1, I'm unsure what the behavior should be
> > for readCommitted, since the current behavior also reads out of the
> > RecordCache. I guess if readCommitted==false, then we will continue to
> read
> > from the cache first, then the Batch, then the store; and if
> > readCommitted==true, we would skip the cache and the Batch and only read
> > from the persistent RocksDB store?
> >
> > What should IQ do if I request to readCommitted on a non-transactional
> > store?
> >
> > Thanks again for proposing the KIP, and my apologies for the long reply;
> > I'm hoping to air all my concerns in one "batch" to save time for you.
> >
> > Thanks,
> > -John
> >
> > On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote:
> > > Hi all,
> > >
> > > I've written a KIP for making Kafka Streams state stores transactional
> > and
> > > would like to start a discussion:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> > >
> > > Best,
> > > Alex
> >
>


-- 
-- Guozhang

Reply via email to