Hi all,

Out of curiosity, why does the performance of the store degrade so
significantly with the 844 implementation? I wouldn't be too surprised by a
50-60% drop (caused by each record being written twice), but 96% is extreme.

The only thing I can think of which could create such a bottleneck would be
that perhaps the 844 implementation deserializes and then re-serializes the
store values when copying from the uncommitted to committed store, but I
wasn't able to figure that out when I scanned the PR.

Colt McNealy
*Founder, LittleHorse.io*


On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <nick.telf...@gmail.com> wrote:

> Hi everyone,
>
> I've updated the KIP to resolve all the points that have been raised so
> far, with one exception: the ALOS default commit interval of 5 minutes is
> likely to cause WriteBatchWithIndex memory to grow too large.
>
> There's a couple of different things I can think of to solve this:
>
>    - We already have a memory/record limit in the KIP to prevent OOM
>    errors. Should we choose a default value for these? My concern here is
> that
>    anything we choose might seem rather arbitrary. We could change
>    its behaviour such that under ALOS, it only triggers the commit of the
>    StateStore, but under EOS, it triggers a commit of the Kafka
> transaction.
>    - We could introduce a separate `checkpoint.interval.ms` to allow ALOS
>    to commit the StateStores more frequently than the general
>    commit.interval.ms? My concern here is that the semantics of this
> config
>    would depend on the processing.mode; under ALOS it would allow more
>    frequently committing stores, whereas under EOS it couldn't.
>
> Any better ideas?
>
> On Wed, 23 Nov 2022 at 16:25, Nick Telford <nick.telf...@gmail.com> wrote:
>
> > Hi Alex,
> >
> > Thanks for the feedback.
> >
> > I've updated the discussion of OOM issues by describing how we'll handle
> > it. Here's the new text:
> >
> > To mitigate this, we will automatically force a Task commit if the total
> >> uncommitted records returned by
> >> StateStore#approximateNumUncommittedEntries()  exceeds a threshold,
> >> configured by max.uncommitted.state.entries.per.task; or the total
> >> memory used for buffering uncommitted records returned by
> >> StateStore#approximateNumUncommittedBytes() exceeds the threshold
> >> configured by max.uncommitted.state.bytes.per.task. This will roughly
> >> bound the memory required per-Task for buffering uncommitted records,
> >> irrespective of the commit.interval.ms, and will effectively bound the
> >> number of records that will need to be restored in the event of a
> failure.
> >>
> >
> >
> > These limits will be checked in StreamTask#process and a premature commit
> >> will be requested via Task#requestCommit().
> >>
> >
> >
> > Note that these new methods provide default implementations that ensure
> >> existing custom stores and non-transactional stores (e.g.
> >> InMemoryKeyValueStore) do not force any early commits.
> >
> >
> > I've chosen to have the StateStore expose approximations of its buffer
> > size/count instead of opaquely requesting a commit in order to delegate
> the
> > decision making to the Task itself. This enables Tasks to look at *all*
> of
> > their StateStores, and determine whether an early commit is necessary.
> > Notably, it enables pre-Task thresholds, instead of per-Store, which
> > prevents Tasks with many StateStores from using much more memory than
> Tasks
> > with one StateStore. This makes sense, since commits are done by-Task,
> not
> > by-Store.
> >
> > Prizes* for anyone who can come up with a better name for the new config
> > properties!
> >
> > Thanks for pointing out the potential performance issues of WBWI. From
> the
> > benchmarks that user posted[1], it looks like WBWI still performs
> > considerably better than individual puts, which is the existing design,
> so
> > I'd actually expect a performance boost from WBWI, just not as great as
> > we'd get from a plain WriteBatch. This does suggest that a good
> > optimization would be to use a regular WriteBatch for restoration (in
> > RocksDBStore#restoreBatch), since we know that those records will never
> be
> > queried before they're committed.
> >
> > 1:
> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >
> > * Just kidding, no prizes, sadly.
> >
> > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov
> > <asorokou...@confluent.io.invalid> wrote:
> >
> >> Hey Nick,
> >>
> >> Thank you for the KIP! With such a significant performance degradation
> in
> >> the secondary store approach, we should definitely consider
> >> WriteBatchWithIndex. I also like encapsulating checkpointing inside the
> >> default state store implementation to improve performance.
> >>
> >> +1 to John's comment to keep the current checkpointing as a fallback
> >> mechanism. We want to keep existing users' workflows intact if we can. A
> >> non-intrusive way would be to add a separate StateStore method, say,
> >> StateStore#managesCheckpointing(), that controls whether the state store
> >> implementation owns checkpointing.
> >>
> >> I think that a solution to the transactional writes should address the
> >> OOMEs. One possible way to address that is to wire StateStore's commit
> >> request by adding, say, StateStore#commitNeeded that is checked in
> >> StreamTask#commitNeeded via the corresponding ProcessorStateManager.
> With
> >> that change, RocksDBStore will have to track the current transaction
> size
> >> and request a commit when the size goes over a (configurable) threshold.
> >>
> >> AFAIU WriteBatchWithIndex might perform significantly slower than
> non-txn
> >> puts as the batch size grows [1]. We should have a configuration to fall
> >> back to the current behavior (and/or disable txn stores for ALOS) unless
> >> the benchmarks show negligible overhead for longer commits /
> large-enough
> >> batch sizes.
> >>
> >> If you prefer to keep the KIP smaller, I would rather cut out
> >> state-store-managed checkpointing rather than proper OOMe handling and
> >> being able to switch to non-txn behavior. The checkpointing is not
> >> necessary to solve the recovery-under-EOS problem. On the other hand,
> once
> >> WriteBatchWithIndex is in, it will be much easier to add
> >> state-store-managed checkpointing.
> >>
> >> If you share the current implementation, I am happy to help you address
> >> the
> >> OOMe and configuration parts as well as review and test the patch.
> >>
> >> Best,
> >> Alex
> >>
> >>
> >> 1. https://github.com/facebook/rocksdb/issues/608
> >>
> >> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <nick.telf...@gmail.com>
> >> wrote:
> >>
> >> > Hi John,
> >> >
> >> > Thanks for the review and feedback!
> >> >
> >> > 1. Custom Stores: I've been mulling over this problem myself. As it
> >> stands,
> >> > custom stores would essentially lose checkpointing with no indication
> >> that
> >> > they're expected to make changes, besides a line in the release
> notes. I
> >> > agree that the best solution would be to provide a default that
> >> checkpoints
> >> > to a file. The one thing I would change is that the checkpointing is
> to
> >> a
> >> > store-local file, instead of a per-Task file. This way the StateStore
> >> still
> >> > technically owns its own checkpointing (via a default implementation),
> >> and
> >> > the StateManager/Task execution engine doesn't need to know anything
> >> about
> >> > checkpointing, which greatly simplifies some of the logic.
> >> >
> >> > 2. OOME errors: The main reasons why I didn't explore a solution to
> >> this is
> >> > a) to keep this KIP as simple as possible, and b) because I'm not
> >> exactly
> >> > how to signal that a Task should commit prematurely. I'm confident
> it's
> >> > possible, and I think it's worth adding a section on handling this.
> >> Besides
> >> > my proposal to force an early commit once memory usage reaches a
> >> threshold,
> >> > is there any other approach that you might suggest for tackling this
> >> > problem?
> >> >
> >> > 3. ALOS: I can add in an explicit paragraph, but my assumption is that
> >> > since transactional behaviour comes at little/no cost, that it should
> be
> >> > available by default on all stores, irrespective of the processing
> mode.
> >> > While ALOS doesn't use transactions, the Task itself still "commits",
> so
> >> > the behaviour should be correct under ALOS too. I'm not convinced that
> >> it's
> >> > worth having both transactional/non-transactional stores available, as
> >> it
> >> > would considerably increase the complexity of the codebase, for very
> >> little
> >> > benefit.
> >> >
> >> > 4. Method deprecation: Are you referring to StateStore#getPosition()?
> >> As I
> >> > understand it, Position contains the position of the *source* topics,
> >> > whereas the commit offsets would be the *changelog* offsets. So it's
> >> still
> >> > necessary to retain the Position data, as well as the changelog
> offsets.
> >> > What I meant in the KIP is that Position offsets are currently stored
> >> in a
> >> > file, and since we can atomically store metadata along with the record
> >> > batch we commit to RocksDB, we can move our Position offsets in to
> this
> >> > metadata too, and gain the same transactional guarantees that we will
> >> for
> >> > changelog offsets, ensuring that the Position offsets are consistent
> >> with
> >> > the records that are read from the database.
> >> >
> >> > Regards,
> >> > Nick
> >> >
> >> > On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org>
> wrote:
> >> >
> >> > > Thanks for publishing this alternative, Nick!
> >> > >
> >> > > The benchmark you mentioned in the KIP-844 discussion seems like a
> >> > > compelling reason to revisit the built-in transactionality
> mechanism.
> >> I
> >> > > also appreciate you analysis, showing that for most use cases, the
> >> write
> >> > > batch approach should be just fine.
> >> > >
> >> > > There are a couple of points that would hold me back from approving
> >> this
> >> > > KIP right now:
> >> > >
> >> > > 1. Loss of coverage for custom stores.
> >> > > The fact that you can plug in a (relatively) simple implementation
> of
> >> the
> >> > > XStateStore interfaces and automagically get a distributed database
> >> out
> >> > of
> >> > > it is a significant benefit of Kafka Streams. I'd hate to lose it,
> so
> >> it
> >> > > would be better to spend some time and come up with a way to
> preserve
> >> > that
> >> > > property. For example, can we provide a default implementation of
> >> > > `commit(..)` that re-implements the existing checkpoint-file
> >> approach? Or
> >> > > perhaps add an `isTransactional()` flag to the state store interface
> >> so
> >> > > that the runtime can decide whether to continue to manage checkpoint
> >> > files
> >> > > vs delegating transactionality to the stores?
> >> > >
> >> > > 2. Guarding against OOME
> >> > > I appreciate your analysis, but I don't think it's sufficient to say
> >> that
> >> > > we will solve the memory problem later if it becomes necessary. The
> >> > > experience leading to that situation would be quite bad: Imagine,
> you
> >> > > upgrade to AK 3.next, your tests pass, so you deploy to production.
> >> That
> >> > > night, you get paged because your app is now crashing with OOMEs. As
> >> with
> >> > > all OOMEs, you'll have a really hard time finding the root cause,
> and
> >> > once
> >> > > you do, you won't have a clear path to resolve the issue. You could
> >> only
> >> > > tune down the commit interval and cache buffer size until you stop
> >> > getting
> >> > > crashes.
> >> > >
> >> > > FYI, I know of multiple cases where people run EOS with much larger
> >> > commit
> >> > > intervals to get better batching than the default, so I don't think
> >> this
> >> > > pathological case would be as rare as you suspect.
> >> > >
> >> > > Given that we already have the rudiments of an idea of what we could
> >> do
> >> > to
> >> > > prevent this downside, we should take the time to design a solution.
> >> We
> >> > owe
> >> > > it to our users to ensure that awesome new features don't come with
> >> > bitter
> >> > > pills unless we can't avoid it.
> >> > >
> >> > > 3. ALOS mode.
> >> > > On the other hand, I didn't see an indication of how stores will be
> >> > > handled under ALOS (aka non-EOS) mode. Theoretically, the
> >> > transactionality
> >> > > of the store and the processing mode are orthogonal. A transactional
> >> > store
> >> > > would serve ALOS just as well as a non-transactional one (if not
> >> better).
> >> > > Under ALOS, though, the default commit interval is five minutes, so
> >> the
> >> > > memory issue is far more pressing.
> >> > >
> >> > > As I see it, we have several options to resolve this point. We could
> >> > > demonstrate that transactional stores work just fine for ALOS and we
> >> can
> >> > > therefore just swap over unconditionally. We could also disable the
> >> > > transactional mechanism under ALOS so that stores operate just the
> >> same
> >> > as
> >> > > they do today when run in ALOS mode. Finally, we could do the same
> as
> >> in
> >> > > KIP-844 and make transactional stores opt-in (it'd be better to
> avoid
> >> the
> >> > > extra opt-in mechanism, but it's a good get-out-of-jail-free card).
> >> > >
> >> > > 4. (minor point) Deprecation of methods
> >> > >
> >> > > You mentioned that the new `commit` method replaces flush,
> >> > > updateChangelogOffsets, and checkpoint. It seems to me that the
> point
> >> > about
> >> > > atomicity and Position also suggests that it replaces the Position
> >> > > callbacks. However, the proposal only deprecates `flush`. Should we
> be
> >> > > deprecating other methods as well?
> >> > >
> >> > > Thanks again for the KIP! It's really nice that you and Alex will
> get
> >> the
> >> > > chance to collaborate on both directions so that we can get the best
> >> > > outcome for Streams and its users.
> >> > >
> >> > > -John
> >> > >
> >> > >
> >> > > On 2022/11/21 15:02:15 Nick Telford wrote:
> >> > > > Hi everyone,
> >> > > >
> >> > > > As I mentioned in the discussion thread for KIP-844, I've been
> >> working
> >> > on
> >> > > > an alternative approach to achieving better transactional
> semantics
> >> for
> >> > > > Kafka Streams StateStores.
> >> > > >
> >> > > > I've published this separately as KIP-892: Transactional Semantics
> >> for
> >> > > > StateStores
> >> > > > <
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >> > > >,
> >> > > > so that it can be discussed/reviewed separately from KIP-844.
> >> > > >
> >> > > > Alex: I'm especially interested in what you think!
> >> > > >
> >> > > > I have a nearly complete implementation of the changes outlined in
> >> this
> >> > > > KIP, please let me know if you'd like me to push them for review
> in
> >> > > advance
> >> > > > of a vote.
> >> > > >
> >> > > > Regards,
> >> > > >
> >> > > > Nick
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to