Hi everyone,

I've drastically reduced the scope of this KIP to no longer include the
StateStore management of checkpointing. This can be added as a KIP later on
to further optimize the consistency and performance of state stores.

I've also added a section discussing some of the concerns around
concurrency, especially in the presence of Iterators. I'm thinking of
wrapping WriteBatchWithIndex with a reference-counting copy-on-write
implementation (that only makes a copy if there's an active iterator), but
I'm open to suggestions.

Regards,
Nick

On Mon, 28 Nov 2022 at 16:36, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi Colt,
>
> I didn't do any profiling, but the 844 implementation:
>
>    - Writes uncommitted records to a temporary RocksDB instance
>       - Since tombstones need to be flagged, all record values are
>       prefixed with a value/tombstone marker. This necessitates a memory copy.
>    - On-commit, iterates all records in this temporary instance and
>    writes them to the main RocksDB store.
>    - While iterating, the value/tombstone marker needs to be parsed and
>    the real value extracted. This necessitates another memory copy.
>
> My guess is that the cost of iterating the temporary RocksDB store is the
> major factor, with the 2 extra memory copies per-Record contributing a
> significant amount too.
>
> Regards,
> Nick
>
> On Mon, 28 Nov 2022 at 16:12, Colt McNealy <c...@littlehorse.io> wrote:
>
>> Hi all,
>>
>> Out of curiosity, why does the performance of the store degrade so
>> significantly with the 844 implementation? I wouldn't be too surprised by
>> a
>> 50-60% drop (caused by each record being written twice), but 96% is
>> extreme.
>>
>> The only thing I can think of which could create such a bottleneck would
>> be
>> that perhaps the 844 implementation deserializes and then re-serializes
>> the
>> store values when copying from the uncommitted to committed store, but I
>> wasn't able to figure that out when I scanned the PR.
>>
>> Colt McNealy
>> *Founder, LittleHorse.io*
>>
>>
>> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <nick.telf...@gmail.com>
>> wrote:
>>
>> > Hi everyone,
>> >
>> > I've updated the KIP to resolve all the points that have been raised so
>> > far, with one exception: the ALOS default commit interval of 5 minutes
>> is
>> > likely to cause WriteBatchWithIndex memory to grow too large.
>> >
>> > There's a couple of different things I can think of to solve this:
>> >
>> >    - We already have a memory/record limit in the KIP to prevent OOM
>> >    errors. Should we choose a default value for these? My concern here
>> is
>> > that
>> >    anything we choose might seem rather arbitrary. We could change
>> >    its behaviour such that under ALOS, it only triggers the commit of
>> the
>> >    StateStore, but under EOS, it triggers a commit of the Kafka
>> > transaction.
>> >    - We could introduce a separate `checkpoint.interval.ms` to allow
>> ALOS
>> >    to commit the StateStores more frequently than the general
>> >    commit.interval.ms? My concern here is that the semantics of this
>> > config
>> >    would depend on the processing.mode; under ALOS it would allow more
>> >    frequently committing stores, whereas under EOS it couldn't.
>> >
>> > Any better ideas?
>> >
>> > On Wed, 23 Nov 2022 at 16:25, Nick Telford <nick.telf...@gmail.com>
>> wrote:
>> >
>> > > Hi Alex,
>> > >
>> > > Thanks for the feedback.
>> > >
>> > > I've updated the discussion of OOM issues by describing how we'll
>> handle
>> > > it. Here's the new text:
>> > >
>> > > To mitigate this, we will automatically force a Task commit if the
>> total
>> > >> uncommitted records returned by
>> > >> StateStore#approximateNumUncommittedEntries()  exceeds a threshold,
>> > >> configured by max.uncommitted.state.entries.per.task; or the total
>> > >> memory used for buffering uncommitted records returned by
>> > >> StateStore#approximateNumUncommittedBytes() exceeds the threshold
>> > >> configured by max.uncommitted.state.bytes.per.task. This will roughly
>> > >> bound the memory required per-Task for buffering uncommitted records,
>> > >> irrespective of the commit.interval.ms, and will effectively bound
>> the
>> > >> number of records that will need to be restored in the event of a
>> > failure.
>> > >>
>> > >
>> > >
>> > > These limits will be checked in StreamTask#process and a premature
>> commit
>> > >> will be requested via Task#requestCommit().
>> > >>
>> > >
>> > >
>> > > Note that these new methods provide default implementations that
>> ensure
>> > >> existing custom stores and non-transactional stores (e.g.
>> > >> InMemoryKeyValueStore) do not force any early commits.
>> > >
>> > >
>> > > I've chosen to have the StateStore expose approximations of its buffer
>> > > size/count instead of opaquely requesting a commit in order to
>> delegate
>> > the
>> > > decision making to the Task itself. This enables Tasks to look at
>> *all*
>> > of
>> > > their StateStores, and determine whether an early commit is necessary.
>> > > Notably, it enables pre-Task thresholds, instead of per-Store, which
>> > > prevents Tasks with many StateStores from using much more memory than
>> > Tasks
>> > > with one StateStore. This makes sense, since commits are done by-Task,
>> > not
>> > > by-Store.
>> > >
>> > > Prizes* for anyone who can come up with a better name for the new
>> config
>> > > properties!
>> > >
>> > > Thanks for pointing out the potential performance issues of WBWI. From
>> > the
>> > > benchmarks that user posted[1], it looks like WBWI still performs
>> > > considerably better than individual puts, which is the existing
>> design,
>> > so
>> > > I'd actually expect a performance boost from WBWI, just not as great
>> as
>> > > we'd get from a plain WriteBatch. This does suggest that a good
>> > > optimization would be to use a regular WriteBatch for restoration (in
>> > > RocksDBStore#restoreBatch), since we know that those records will
>> never
>> > be
>> > > queried before they're committed.
>> > >
>> > > 1:
>> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results
>> > >
>> > > * Just kidding, no prizes, sadly.
>> > >
>> > > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov
>> > > <asorokou...@confluent.io.invalid> wrote:
>> > >
>> > >> Hey Nick,
>> > >>
>> > >> Thank you for the KIP! With such a significant performance
>> degradation
>> > in
>> > >> the secondary store approach, we should definitely consider
>> > >> WriteBatchWithIndex. I also like encapsulating checkpointing inside
>> the
>> > >> default state store implementation to improve performance.
>> > >>
>> > >> +1 to John's comment to keep the current checkpointing as a fallback
>> > >> mechanism. We want to keep existing users' workflows intact if we
>> can. A
>> > >> non-intrusive way would be to add a separate StateStore method, say,
>> > >> StateStore#managesCheckpointing(), that controls whether the state
>> store
>> > >> implementation owns checkpointing.
>> > >>
>> > >> I think that a solution to the transactional writes should address
>> the
>> > >> OOMEs. One possible way to address that is to wire StateStore's
>> commit
>> > >> request by adding, say, StateStore#commitNeeded that is checked in
>> > >> StreamTask#commitNeeded via the corresponding ProcessorStateManager.
>> > With
>> > >> that change, RocksDBStore will have to track the current transaction
>> > size
>> > >> and request a commit when the size goes over a (configurable)
>> threshold.
>> > >>
>> > >> AFAIU WriteBatchWithIndex might perform significantly slower than
>> > non-txn
>> > >> puts as the batch size grows [1]. We should have a configuration to
>> fall
>> > >> back to the current behavior (and/or disable txn stores for ALOS)
>> unless
>> > >> the benchmarks show negligible overhead for longer commits /
>> > large-enough
>> > >> batch sizes.
>> > >>
>> > >> If you prefer to keep the KIP smaller, I would rather cut out
>> > >> state-store-managed checkpointing rather than proper OOMe handling
>> and
>> > >> being able to switch to non-txn behavior. The checkpointing is not
>> > >> necessary to solve the recovery-under-EOS problem. On the other hand,
>> > once
>> > >> WriteBatchWithIndex is in, it will be much easier to add
>> > >> state-store-managed checkpointing.
>> > >>
>> > >> If you share the current implementation, I am happy to help you
>> address
>> > >> the
>> > >> OOMe and configuration parts as well as review and test the patch.
>> > >>
>> > >> Best,
>> > >> Alex
>> > >>
>> > >>
>> > >> 1. https://github.com/facebook/rocksdb/issues/608
>> > >>
>> > >> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <nick.telf...@gmail.com
>> >
>> > >> wrote:
>> > >>
>> > >> > Hi John,
>> > >> >
>> > >> > Thanks for the review and feedback!
>> > >> >
>> > >> > 1. Custom Stores: I've been mulling over this problem myself. As it
>> > >> stands,
>> > >> > custom stores would essentially lose checkpointing with no
>> indication
>> > >> that
>> > >> > they're expected to make changes, besides a line in the release
>> > notes. I
>> > >> > agree that the best solution would be to provide a default that
>> > >> checkpoints
>> > >> > to a file. The one thing I would change is that the checkpointing
>> is
>> > to
>> > >> a
>> > >> > store-local file, instead of a per-Task file. This way the
>> StateStore
>> > >> still
>> > >> > technically owns its own checkpointing (via a default
>> implementation),
>> > >> and
>> > >> > the StateManager/Task execution engine doesn't need to know
>> anything
>> > >> about
>> > >> > checkpointing, which greatly simplifies some of the logic.
>> > >> >
>> > >> > 2. OOME errors: The main reasons why I didn't explore a solution to
>> > >> this is
>> > >> > a) to keep this KIP as simple as possible, and b) because I'm not
>> > >> exactly
>> > >> > how to signal that a Task should commit prematurely. I'm confident
>> > it's
>> > >> > possible, and I think it's worth adding a section on handling this.
>> > >> Besides
>> > >> > my proposal to force an early commit once memory usage reaches a
>> > >> threshold,
>> > >> > is there any other approach that you might suggest for tackling
>> this
>> > >> > problem?
>> > >> >
>> > >> > 3. ALOS: I can add in an explicit paragraph, but my assumption is
>> that
>> > >> > since transactional behaviour comes at little/no cost, that it
>> should
>> > be
>> > >> > available by default on all stores, irrespective of the processing
>> > mode.
>> > >> > While ALOS doesn't use transactions, the Task itself still
>> "commits",
>> > so
>> > >> > the behaviour should be correct under ALOS too. I'm not convinced
>> that
>> > >> it's
>> > >> > worth having both transactional/non-transactional stores
>> available, as
>> > >> it
>> > >> > would considerably increase the complexity of the codebase, for
>> very
>> > >> little
>> > >> > benefit.
>> > >> >
>> > >> > 4. Method deprecation: Are you referring to
>> StateStore#getPosition()?
>> > >> As I
>> > >> > understand it, Position contains the position of the *source*
>> topics,
>> > >> > whereas the commit offsets would be the *changelog* offsets. So
>> it's
>> > >> still
>> > >> > necessary to retain the Position data, as well as the changelog
>> > offsets.
>> > >> > What I meant in the KIP is that Position offsets are currently
>> stored
>> > >> in a
>> > >> > file, and since we can atomically store metadata along with the
>> record
>> > >> > batch we commit to RocksDB, we can move our Position offsets in to
>> > this
>> > >> > metadata too, and gain the same transactional guarantees that we
>> will
>> > >> for
>> > >> > changelog offsets, ensuring that the Position offsets are
>> consistent
>> > >> with
>> > >> > the records that are read from the database.
>> > >> >
>> > >> > Regards,
>> > >> > Nick
>> > >> >
>> > >> > On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org>
>> > wrote:
>> > >> >
>> > >> > > Thanks for publishing this alternative, Nick!
>> > >> > >
>> > >> > > The benchmark you mentioned in the KIP-844 discussion seems like
>> a
>> > >> > > compelling reason to revisit the built-in transactionality
>> > mechanism.
>> > >> I
>> > >> > > also appreciate you analysis, showing that for most use cases,
>> the
>> > >> write
>> > >> > > batch approach should be just fine.
>> > >> > >
>> > >> > > There are a couple of points that would hold me back from
>> approving
>> > >> this
>> > >> > > KIP right now:
>> > >> > >
>> > >> > > 1. Loss of coverage for custom stores.
>> > >> > > The fact that you can plug in a (relatively) simple
>> implementation
>> > of
>> > >> the
>> > >> > > XStateStore interfaces and automagically get a distributed
>> database
>> > >> out
>> > >> > of
>> > >> > > it is a significant benefit of Kafka Streams. I'd hate to lose
>> it,
>> > so
>> > >> it
>> > >> > > would be better to spend some time and come up with a way to
>> > preserve
>> > >> > that
>> > >> > > property. For example, can we provide a default implementation of
>> > >> > > `commit(..)` that re-implements the existing checkpoint-file
>> > >> approach? Or
>> > >> > > perhaps add an `isTransactional()` flag to the state store
>> interface
>> > >> so
>> > >> > > that the runtime can decide whether to continue to manage
>> checkpoint
>> > >> > files
>> > >> > > vs delegating transactionality to the stores?
>> > >> > >
>> > >> > > 2. Guarding against OOME
>> > >> > > I appreciate your analysis, but I don't think it's sufficient to
>> say
>> > >> that
>> > >> > > we will solve the memory problem later if it becomes necessary.
>> The
>> > >> > > experience leading to that situation would be quite bad: Imagine,
>> > you
>> > >> > > upgrade to AK 3.next, your tests pass, so you deploy to
>> production.
>> > >> That
>> > >> > > night, you get paged because your app is now crashing with
>> OOMEs. As
>> > >> with
>> > >> > > all OOMEs, you'll have a really hard time finding the root cause,
>> > and
>> > >> > once
>> > >> > > you do, you won't have a clear path to resolve the issue. You
>> could
>> > >> only
>> > >> > > tune down the commit interval and cache buffer size until you
>> stop
>> > >> > getting
>> > >> > > crashes.
>> > >> > >
>> > >> > > FYI, I know of multiple cases where people run EOS with much
>> larger
>> > >> > commit
>> > >> > > intervals to get better batching than the default, so I don't
>> think
>> > >> this
>> > >> > > pathological case would be as rare as you suspect.
>> > >> > >
>> > >> > > Given that we already have the rudiments of an idea of what we
>> could
>> > >> do
>> > >> > to
>> > >> > > prevent this downside, we should take the time to design a
>> solution.
>> > >> We
>> > >> > owe
>> > >> > > it to our users to ensure that awesome new features don't come
>> with
>> > >> > bitter
>> > >> > > pills unless we can't avoid it.
>> > >> > >
>> > >> > > 3. ALOS mode.
>> > >> > > On the other hand, I didn't see an indication of how stores will
>> be
>> > >> > > handled under ALOS (aka non-EOS) mode. Theoretically, the
>> > >> > transactionality
>> > >> > > of the store and the processing mode are orthogonal. A
>> transactional
>> > >> > store
>> > >> > > would serve ALOS just as well as a non-transactional one (if not
>> > >> better).
>> > >> > > Under ALOS, though, the default commit interval is five minutes,
>> so
>> > >> the
>> > >> > > memory issue is far more pressing.
>> > >> > >
>> > >> > > As I see it, we have several options to resolve this point. We
>> could
>> > >> > > demonstrate that transactional stores work just fine for ALOS
>> and we
>> > >> can
>> > >> > > therefore just swap over unconditionally. We could also disable
>> the
>> > >> > > transactional mechanism under ALOS so that stores operate just
>> the
>> > >> same
>> > >> > as
>> > >> > > they do today when run in ALOS mode. Finally, we could do the
>> same
>> > as
>> > >> in
>> > >> > > KIP-844 and make transactional stores opt-in (it'd be better to
>> > avoid
>> > >> the
>> > >> > > extra opt-in mechanism, but it's a good get-out-of-jail-free
>> card).
>> > >> > >
>> > >> > > 4. (minor point) Deprecation of methods
>> > >> > >
>> > >> > > You mentioned that the new `commit` method replaces flush,
>> > >> > > updateChangelogOffsets, and checkpoint. It seems to me that the
>> > point
>> > >> > about
>> > >> > > atomicity and Position also suggests that it replaces the
>> Position
>> > >> > > callbacks. However, the proposal only deprecates `flush`. Should
>> we
>> > be
>> > >> > > deprecating other methods as well?
>> > >> > >
>> > >> > > Thanks again for the KIP! It's really nice that you and Alex will
>> > get
>> > >> the
>> > >> > > chance to collaborate on both directions so that we can get the
>> best
>> > >> > > outcome for Streams and its users.
>> > >> > >
>> > >> > > -John
>> > >> > >
>> > >> > >
>> > >> > > On 2022/11/21 15:02:15 Nick Telford wrote:
>> > >> > > > Hi everyone,
>> > >> > > >
>> > >> > > > As I mentioned in the discussion thread for KIP-844, I've been
>> > >> working
>> > >> > on
>> > >> > > > an alternative approach to achieving better transactional
>> > semantics
>> > >> for
>> > >> > > > Kafka Streams StateStores.
>> > >> > > >
>> > >> > > > I've published this separately as KIP-892: Transactional
>> Semantics
>> > >> for
>> > >> > > > StateStores
>> > >> > > > <
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>> > >> > > >,
>> > >> > > > so that it can be discussed/reviewed separately from KIP-844.
>> > >> > > >
>> > >> > > > Alex: I'm especially interested in what you think!
>> > >> > > >
>> > >> > > > I have a nearly complete implementation of the changes
>> outlined in
>> > >> this
>> > >> > > > KIP, please let me know if you'd like me to push them for
>> review
>> > in
>> > >> > > advance
>> > >> > > > of a vote.
>> > >> > > >
>> > >> > > > Regards,
>> > >> > > >
>> > >> > > > Nick
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> >
>>
>

Reply via email to