Hi Alex,

Thanks for the feedback.

I've updated the discussion of OOM issues by describing how we'll handle
it. Here's the new text:

To mitigate this, we will automatically force a Task commit if the total
> uncommitted records returned by
> StateStore#approximateNumUncommittedEntries()  exceeds a threshold,
> configured by max.uncommitted.state.entries.per.task; or the total memory
> used for buffering uncommitted records returned by
> StateStore#approximateNumUncommittedBytes() exceeds the threshold
> configured by max.uncommitted.state.bytes.per.task. This will roughly
> bound the memory required per-Task for buffering uncommitted records,
> irrespective of the commit.interval.ms, and will effectively bound the
> number of records that will need to be restored in the event of a failure.
>


These limits will be checked in StreamTask#process and a premature commit
> will be requested via Task#requestCommit().
>


Note that these new methods provide default implementations that ensure
> existing custom stores and non-transactional stores (e.g.
> InMemoryKeyValueStore) do not force any early commits.


I've chosen to have the StateStore expose approximations of its buffer
size/count instead of opaquely requesting a commit in order to delegate the
decision making to the Task itself. This enables Tasks to look at *all* of
their StateStores, and determine whether an early commit is necessary.
Notably, it enables pre-Task thresholds, instead of per-Store, which
prevents Tasks with many StateStores from using much more memory than Tasks
with one StateStore. This makes sense, since commits are done by-Task, not
by-Store.

Prizes* for anyone who can come up with a better name for the new config
properties!

Thanks for pointing out the potential performance issues of WBWI. From the
benchmarks that user posted[1], it looks like WBWI still performs
considerably better than individual puts, which is the existing design, so
I'd actually expect a performance boost from WBWI, just not as great as
we'd get from a plain WriteBatch. This does suggest that a good
optimization would be to use a regular WriteBatch for restoration (in
RocksDBStore#restoreBatch), since we know that those records will never be
queried before they're committed.

1: https://github.com/adamretter/rocksjava-write-methods-benchmark#results

* Just kidding, no prizes, sadly.

On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

> Hey Nick,
>
> Thank you for the KIP! With such a significant performance degradation in
> the secondary store approach, we should definitely consider
> WriteBatchWithIndex. I also like encapsulating checkpointing inside the
> default state store implementation to improve performance.
>
> +1 to John's comment to keep the current checkpointing as a fallback
> mechanism. We want to keep existing users' workflows intact if we can. A
> non-intrusive way would be to add a separate StateStore method, say,
> StateStore#managesCheckpointing(), that controls whether the state store
> implementation owns checkpointing.
>
> I think that a solution to the transactional writes should address the
> OOMEs. One possible way to address that is to wire StateStore's commit
> request by adding, say, StateStore#commitNeeded that is checked in
> StreamTask#commitNeeded via the corresponding ProcessorStateManager. With
> that change, RocksDBStore will have to track the current transaction size
> and request a commit when the size goes over a (configurable) threshold.
>
> AFAIU WriteBatchWithIndex might perform significantly slower than non-txn
> puts as the batch size grows [1]. We should have a configuration to fall
> back to the current behavior (and/or disable txn stores for ALOS) unless
> the benchmarks show negligible overhead for longer commits / large-enough
> batch sizes.
>
> If you prefer to keep the KIP smaller, I would rather cut out
> state-store-managed checkpointing rather than proper OOMe handling and
> being able to switch to non-txn behavior. The checkpointing is not
> necessary to solve the recovery-under-EOS problem. On the other hand, once
> WriteBatchWithIndex is in, it will be much easier to add
> state-store-managed checkpointing.
>
> If you share the current implementation, I am happy to help you address the
> OOMe and configuration parts as well as review and test the patch.
>
> Best,
> Alex
>
>
> 1. https://github.com/facebook/rocksdb/issues/608
>
> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <nick.telf...@gmail.com>
> wrote:
>
> > Hi John,
> >
> > Thanks for the review and feedback!
> >
> > 1. Custom Stores: I've been mulling over this problem myself. As it
> stands,
> > custom stores would essentially lose checkpointing with no indication
> that
> > they're expected to make changes, besides a line in the release notes. I
> > agree that the best solution would be to provide a default that
> checkpoints
> > to a file. The one thing I would change is that the checkpointing is to a
> > store-local file, instead of a per-Task file. This way the StateStore
> still
> > technically owns its own checkpointing (via a default implementation),
> and
> > the StateManager/Task execution engine doesn't need to know anything
> about
> > checkpointing, which greatly simplifies some of the logic.
> >
> > 2. OOME errors: The main reasons why I didn't explore a solution to this
> is
> > a) to keep this KIP as simple as possible, and b) because I'm not exactly
> > how to signal that a Task should commit prematurely. I'm confident it's
> > possible, and I think it's worth adding a section on handling this.
> Besides
> > my proposal to force an early commit once memory usage reaches a
> threshold,
> > is there any other approach that you might suggest for tackling this
> > problem?
> >
> > 3. ALOS: I can add in an explicit paragraph, but my assumption is that
> > since transactional behaviour comes at little/no cost, that it should be
> > available by default on all stores, irrespective of the processing mode.
> > While ALOS doesn't use transactions, the Task itself still "commits", so
> > the behaviour should be correct under ALOS too. I'm not convinced that
> it's
> > worth having both transactional/non-transactional stores available, as it
> > would considerably increase the complexity of the codebase, for very
> little
> > benefit.
> >
> > 4. Method deprecation: Are you referring to StateStore#getPosition()? As
> I
> > understand it, Position contains the position of the *source* topics,
> > whereas the commit offsets would be the *changelog* offsets. So it's
> still
> > necessary to retain the Position data, as well as the changelog offsets.
> > What I meant in the KIP is that Position offsets are currently stored in
> a
> > file, and since we can atomically store metadata along with the record
> > batch we commit to RocksDB, we can move our Position offsets in to this
> > metadata too, and gain the same transactional guarantees that we will for
> > changelog offsets, ensuring that the Position offsets are consistent with
> > the records that are read from the database.
> >
> > Regards,
> > Nick
> >
> > On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org> wrote:
> >
> > > Thanks for publishing this alternative, Nick!
> > >
> > > The benchmark you mentioned in the KIP-844 discussion seems like a
> > > compelling reason to revisit the built-in transactionality mechanism. I
> > > also appreciate you analysis, showing that for most use cases, the
> write
> > > batch approach should be just fine.
> > >
> > > There are a couple of points that would hold me back from approving
> this
> > > KIP right now:
> > >
> > > 1. Loss of coverage for custom stores.
> > > The fact that you can plug in a (relatively) simple implementation of
> the
> > > XStateStore interfaces and automagically get a distributed database out
> > of
> > > it is a significant benefit of Kafka Streams. I'd hate to lose it, so
> it
> > > would be better to spend some time and come up with a way to preserve
> > that
> > > property. For example, can we provide a default implementation of
> > > `commit(..)` that re-implements the existing checkpoint-file approach?
> Or
> > > perhaps add an `isTransactional()` flag to the state store interface so
> > > that the runtime can decide whether to continue to manage checkpoint
> > files
> > > vs delegating transactionality to the stores?
> > >
> > > 2. Guarding against OOME
> > > I appreciate your analysis, but I don't think it's sufficient to say
> that
> > > we will solve the memory problem later if it becomes necessary. The
> > > experience leading to that situation would be quite bad: Imagine, you
> > > upgrade to AK 3.next, your tests pass, so you deploy to production.
> That
> > > night, you get paged because your app is now crashing with OOMEs. As
> with
> > > all OOMEs, you'll have a really hard time finding the root cause, and
> > once
> > > you do, you won't have a clear path to resolve the issue. You could
> only
> > > tune down the commit interval and cache buffer size until you stop
> > getting
> > > crashes.
> > >
> > > FYI, I know of multiple cases where people run EOS with much larger
> > commit
> > > intervals to get better batching than the default, so I don't think
> this
> > > pathological case would be as rare as you suspect.
> > >
> > > Given that we already have the rudiments of an idea of what we could do
> > to
> > > prevent this downside, we should take the time to design a solution. We
> > owe
> > > it to our users to ensure that awesome new features don't come with
> > bitter
> > > pills unless we can't avoid it.
> > >
> > > 3. ALOS mode.
> > > On the other hand, I didn't see an indication of how stores will be
> > > handled under ALOS (aka non-EOS) mode. Theoretically, the
> > transactionality
> > > of the store and the processing mode are orthogonal. A transactional
> > store
> > > would serve ALOS just as well as a non-transactional one (if not
> better).
> > > Under ALOS, though, the default commit interval is five minutes, so the
> > > memory issue is far more pressing.
> > >
> > > As I see it, we have several options to resolve this point. We could
> > > demonstrate that transactional stores work just fine for ALOS and we
> can
> > > therefore just swap over unconditionally. We could also disable the
> > > transactional mechanism under ALOS so that stores operate just the same
> > as
> > > they do today when run in ALOS mode. Finally, we could do the same as
> in
> > > KIP-844 and make transactional stores opt-in (it'd be better to avoid
> the
> > > extra opt-in mechanism, but it's a good get-out-of-jail-free card).
> > >
> > > 4. (minor point) Deprecation of methods
> > >
> > > You mentioned that the new `commit` method replaces flush,
> > > updateChangelogOffsets, and checkpoint. It seems to me that the point
> > about
> > > atomicity and Position also suggests that it replaces the Position
> > > callbacks. However, the proposal only deprecates `flush`. Should we be
> > > deprecating other methods as well?
> > >
> > > Thanks again for the KIP! It's really nice that you and Alex will get
> the
> > > chance to collaborate on both directions so that we can get the best
> > > outcome for Streams and its users.
> > >
> > > -John
> > >
> > >
> > > On 2022/11/21 15:02:15 Nick Telford wrote:
> > > > Hi everyone,
> > > >
> > > > As I mentioned in the discussion thread for KIP-844, I've been
> working
> > on
> > > > an alternative approach to achieving better transactional semantics
> for
> > > > Kafka Streams StateStores.
> > > >
> > > > I've published this separately as KIP-892: Transactional Semantics
> for
> > > > StateStores
> > > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > > >,
> > > > so that it can be discussed/reviewed separately from KIP-844.
> > > >
> > > > Alex: I'm especially interested in what you think!
> > > >
> > > > I have a nearly complete implementation of the changes outlined in
> this
> > > > KIP, please let me know if you'd like me to push them for review in
> > > advance
> > > > of a vote.
> > > >
> > > > Regards,
> > > >
> > > > Nick
> > > >
> > >
> >
>

Reply via email to