Also to dump my thoughts on the topic of "caching/flushing/transactions"
that John raised, especially about their relations to range queries (I
think some of them could be excluded from the scope of KIP-844, but I'd
like to see if we have some consensus on the long-term vision):

0) Today all cached data that have not been flushed are not committed for
sure, but even flushed data to the persistent underlying store may also be
uncommitted since flushing can be triggered asynchronously before the
commit.

1) For processing, store reads need to always return "uncommitted data"
since that's required by the processing semantics. As for range queries,
today they are only triggered in those cases: a) session aggregation, b)
sliding aggregation, c) stream-stream join, d) any PAPI processor that does
reads.

2) For IQ, it's debatable whether state queries should return only
committed data or not.

3) When range queries are issued, since we need to maintain the returned
iteration ordering we need to do a "merge-sort" between the cache and the
underlying store on the fly, which is quite costly. In fact, sometimes we
observe that the cost of such merge-sorted iterators from the range queries
may overwhelm the benefits of buffering writes on those processing
operators in 1) above.

So I'm wondering, that after we introduced transactional state stores, and
as we may refactor our cache/state management (e.g. as we want to support
multi-versioned stores), should we consider:

* Maintain the state position (offset) of the task as committed, i.e. we
only advance the position upon committing the task. And hence:

* Let the IQ queries only access committed data in addition to the
indicated position token. This means that if the current state's committed
position is 100, and uncommitted position is 120, a query with position 90
would read the data as of 100 (or 90 when we support multi-version)
directly, while a query with position 110 would wait for the committed
position to advance beyond 110.

* This also means IQ range queries can always bypass the cache layer.

* Make the caching layer to be write-through for the built-state stores for
those processors mentioned in 1) above, i.e. they only help on read IOs,
but not write IOs. Then processing's range reads would also bypass the
cache layer, while point-lookup can still go to the cache layer first.

* In the long-run, we may consider moving away from a per-thread cache
space, consolidate multiple logical stores into a single physical store,
and potentially decouple caching from emitting. In that world we would
potentially have a single layer of whitebox cache which we can fully
control when/what to flush, and how that's aligned with committing, and
hence knows which part of the cache is committed or uncommitted, in this
case committed look-up reads could still go to the cache layer as we know
which cached records are below the committed position and hence considered
as committed.



Guozhang



On Tue, May 24, 2022 at 10:04 AM John Roesler <vvcep...@apache.org> wrote:

> Thanks for the KIP, Alex!
>
> I'm really happy to see your proposal. This improvement fills a
> long-standing gap.
>
> I have a few questions:
>
> 1. Configuration
> The KIP only mentions RocksDB, but of course, Streams also ships with an
> InMemory store, and users also plug in their own custom state stores. It is
> also common to use multiple types of state stores in the same application
> for different purposes.
>
> Against this backdrop, the choice to configure transactionality as a
> top-level config, as well as to configure the store transaction mechanism
> as a top-level config, seems a bit off.
>
> Did you consider instead just adding the option to the
> RocksDB*StoreSupplier classes and the factories in Stores ? It seems like
> the desire to enable the feature by default, but with a feature-flag to
> disable it was a factor here. However, as you pointed out, there are some
> major considerations that users should be aware of, so opt-in doesn't seem
> like a bad choice, either. You could add an Enum argument to those
> factories like `RocksDBTransactionalMechanism.{NONE,
>
> Some points in favor of this approach:
> * Avoid "stores that don't support transactions ignore the config"
> complexity
> * Users can choose how to spend their memory budget, making some stores
> transactional and others not
> * When we add transactional support to in-memory stores, we don't have to
> figure out what to do with the mechanism config (i.e., what do you set the
> mechanism to when there are multiple kinds of transactional stores in the
> topology?)
>
> 2. caching/flushing/transactions
> The coupling between memory usage and flushing that you mentioned is a bit
> troubling. It also occurs to me that there seems to be some relationship
> with the existing record cache, which is also an in-memory holding area for
> records that are not yet written to the cache and/or store (albeit with no
> particular semantics). Have you considered how all these components should
> relate? For example, should a "full" WriteBatch actually trigger a flush so
> that we don't get OOMEs? If the proposed transactional mechanism forces all
> uncommitted writes to be buffered in memory, until a commit, then what is
> the advantage over just doing the same thing with the RecordCache and not
> introducing the WriteBatch at all?
>
> 3. ALOS
> You mentioned that a transactional store can help reduce duplication in
> the case of ALOS. We might want to be careful about claims like that.
> Duplication isn't the way that repeated processing manifests in state
> stores. Rather, it is in the form of dirty reads during reprocessing. This
> feature may reduce the incidence of dirty reads during reprocessing, but
> not in a predictable way. During regular processing today, we will send
> some records through to the changelog in between commit intervals. Under
> ALOS, if any of those dirty writes gets committed to the changelog topic,
> then upon failure, we have to roll the store forward to them anyway,
> regardless of this new transactional mechanism. That's a fixable problem,
> by the way, but this KIP doesn't seem to fix it. I wonder if we should make
> any claims about the relationship of this feature to ALOS if the real-world
> behavior is so complex.
>
> 4. IQ
> As a reminder, we have a new IQv2 mechanism now. Should we propose any
> changes to IQv1 to support this transactional mechanism, versus just
> proposing it for IQv2? Certainly, it seems strange only to propose a change
> for IQv1 and not v2.
>
> Regarding your proposal for IQv1, I'm unsure what the behavior should be
> for readCommitted, since the current behavior also reads out of the
> RecordCache. I guess if readCommitted==false, then we will continue to read
> from the cache first, then the Batch, then the store; and if
> readCommitted==true, we would skip the cache and the Batch and only read
> from the persistent RocksDB store?
>
> What should IQ do if I request to readCommitted on a non-transactional
> store?
>
> Thanks again for proposing the KIP, and my apologies for the long reply;
> I'm hoping to air all my concerns in one "batch" to save time for you.
>
> Thanks,
> -John
>
> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote:
> > Hi all,
> >
> > I've written a KIP for making Kafka Streams state stores transactional
> and
> > would like to start a discussion:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> >
> > Best,
> > Alex
>


-- 
-- Guozhang

Reply via email to