For now, I've settled on choosing an arbitrary default memory limit of
64MiB per-Task for buffering uncommitted records. I noticed that Kafka
Streams already provides some arbitrary default configuration of RocksDB
memory settings (i.e. memtable size etc.), and that many users will already
be explicitly configuring this for their purposes.

I think a further optimization for ALOS to only commit the StateStores when
exceeding this limit is reasonable, to preserve the user's desired
commit.interval.ms as much as possible.

On Mon, 28 Nov 2022 at 15:55, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi everyone,
>
> I've updated the KIP to resolve all the points that have been raised so
> far, with one exception: the ALOS default commit interval of 5 minutes is
> likely to cause WriteBatchWithIndex memory to grow too large.
>
> There's a couple of different things I can think of to solve this:
>
>    - We already have a memory/record limit in the KIP to prevent OOM
>    errors. Should we choose a default value for these? My concern here is that
>    anything we choose might seem rather arbitrary. We could change
>    its behaviour such that under ALOS, it only triggers the commit of the
>    StateStore, but under EOS, it triggers a commit of the Kafka transaction.
>    - We could introduce a separate `checkpoint.interval.ms` to allow ALOS
>    to commit the StateStores more frequently than the general
>    commit.interval.ms? My concern here is that the semantics of this
>    config would depend on the processing.mode; under ALOS it would allow more
>    frequently committing stores, whereas under EOS it couldn't.
>
> Any better ideas?
>
> On Wed, 23 Nov 2022 at 16:25, Nick Telford <nick.telf...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> Thanks for the feedback.
>>
>> I've updated the discussion of OOM issues by describing how we'll handle
>> it. Here's the new text:
>>
>> To mitigate this, we will automatically force a Task commit if the total
>>> uncommitted records returned by
>>> StateStore#approximateNumUncommittedEntries()  exceeds a threshold,
>>> configured by max.uncommitted.state.entries.per.task; or the total
>>> memory used for buffering uncommitted records returned by
>>> StateStore#approximateNumUncommittedBytes() exceeds the threshold
>>> configured by max.uncommitted.state.bytes.per.task. This will roughly
>>> bound the memory required per-Task for buffering uncommitted records,
>>> irrespective of the commit.interval.ms, and will effectively bound the
>>> number of records that will need to be restored in the event of a failure.
>>>
>>
>>
>> These limits will be checked in StreamTask#process and a premature
>>> commit will be requested via Task#requestCommit().
>>>
>>
>>
>> Note that these new methods provide default implementations that ensure
>>> existing custom stores and non-transactional stores (e.g.
>>> InMemoryKeyValueStore) do not force any early commits.
>>
>>
>> I've chosen to have the StateStore expose approximations of its buffer
>> size/count instead of opaquely requesting a commit in order to delegate the
>> decision making to the Task itself. This enables Tasks to look at *all* of
>> their StateStores, and determine whether an early commit is necessary.
>> Notably, it enables pre-Task thresholds, instead of per-Store, which
>> prevents Tasks with many StateStores from using much more memory than Tasks
>> with one StateStore. This makes sense, since commits are done by-Task, not
>> by-Store.
>>
>> Prizes* for anyone who can come up with a better name for the new config
>> properties!
>>
>> Thanks for pointing out the potential performance issues of WBWI. From
>> the benchmarks that user posted[1], it looks like WBWI still performs
>> considerably better than individual puts, which is the existing design, so
>> I'd actually expect a performance boost from WBWI, just not as great as
>> we'd get from a plain WriteBatch. This does suggest that a good
>> optimization would be to use a regular WriteBatch for restoration (in
>> RocksDBStore#restoreBatch), since we know that those records will never be
>> queried before they're committed.
>>
>> 1:
>> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
>>
>> * Just kidding, no prizes, sadly.
>>
>> On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov
>> <asorokou...@confluent.io.invalid> wrote:
>>
>>> Hey Nick,
>>>
>>> Thank you for the KIP! With such a significant performance degradation in
>>> the secondary store approach, we should definitely consider
>>> WriteBatchWithIndex. I also like encapsulating checkpointing inside the
>>> default state store implementation to improve performance.
>>>
>>> +1 to John's comment to keep the current checkpointing as a fallback
>>> mechanism. We want to keep existing users' workflows intact if we can. A
>>> non-intrusive way would be to add a separate StateStore method, say,
>>> StateStore#managesCheckpointing(), that controls whether the state store
>>> implementation owns checkpointing.
>>>
>>> I think that a solution to the transactional writes should address the
>>> OOMEs. One possible way to address that is to wire StateStore's commit
>>> request by adding, say, StateStore#commitNeeded that is checked in
>>> StreamTask#commitNeeded via the corresponding ProcessorStateManager. With
>>> that change, RocksDBStore will have to track the current transaction size
>>> and request a commit when the size goes over a (configurable) threshold.
>>>
>>> AFAIU WriteBatchWithIndex might perform significantly slower than non-txn
>>> puts as the batch size grows [1]. We should have a configuration to fall
>>> back to the current behavior (and/or disable txn stores for ALOS) unless
>>> the benchmarks show negligible overhead for longer commits / large-enough
>>> batch sizes.
>>>
>>> If you prefer to keep the KIP smaller, I would rather cut out
>>> state-store-managed checkpointing rather than proper OOMe handling and
>>> being able to switch to non-txn behavior. The checkpointing is not
>>> necessary to solve the recovery-under-EOS problem. On the other hand,
>>> once
>>> WriteBatchWithIndex is in, it will be much easier to add
>>> state-store-managed checkpointing.
>>>
>>> If you share the current implementation, I am happy to help you address
>>> the
>>> OOMe and configuration parts as well as review and test the patch.
>>>
>>> Best,
>>> Alex
>>>
>>>
>>> 1. https://github.com/facebook/rocksdb/issues/608
>>>
>>> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <nick.telf...@gmail.com>
>>> wrote:
>>>
>>> > Hi John,
>>> >
>>> > Thanks for the review and feedback!
>>> >
>>> > 1. Custom Stores: I've been mulling over this problem myself. As it
>>> stands,
>>> > custom stores would essentially lose checkpointing with no indication
>>> that
>>> > they're expected to make changes, besides a line in the release notes.
>>> I
>>> > agree that the best solution would be to provide a default that
>>> checkpoints
>>> > to a file. The one thing I would change is that the checkpointing is
>>> to a
>>> > store-local file, instead of a per-Task file. This way the StateStore
>>> still
>>> > technically owns its own checkpointing (via a default implementation),
>>> and
>>> > the StateManager/Task execution engine doesn't need to know anything
>>> about
>>> > checkpointing, which greatly simplifies some of the logic.
>>> >
>>> > 2. OOME errors: The main reasons why I didn't explore a solution to
>>> this is
>>> > a) to keep this KIP as simple as possible, and b) because I'm not
>>> exactly
>>> > how to signal that a Task should commit prematurely. I'm confident it's
>>> > possible, and I think it's worth adding a section on handling this.
>>> Besides
>>> > my proposal to force an early commit once memory usage reaches a
>>> threshold,
>>> > is there any other approach that you might suggest for tackling this
>>> > problem?
>>> >
>>> > 3. ALOS: I can add in an explicit paragraph, but my assumption is that
>>> > since transactional behaviour comes at little/no cost, that it should
>>> be
>>> > available by default on all stores, irrespective of the processing
>>> mode.
>>> > While ALOS doesn't use transactions, the Task itself still "commits",
>>> so
>>> > the behaviour should be correct under ALOS too. I'm not convinced that
>>> it's
>>> > worth having both transactional/non-transactional stores available, as
>>> it
>>> > would considerably increase the complexity of the codebase, for very
>>> little
>>> > benefit.
>>> >
>>> > 4. Method deprecation: Are you referring to StateStore#getPosition()?
>>> As I
>>> > understand it, Position contains the position of the *source* topics,
>>> > whereas the commit offsets would be the *changelog* offsets. So it's
>>> still
>>> > necessary to retain the Position data, as well as the changelog
>>> offsets.
>>> > What I meant in the KIP is that Position offsets are currently stored
>>> in a
>>> > file, and since we can atomically store metadata along with the record
>>> > batch we commit to RocksDB, we can move our Position offsets in to this
>>> > metadata too, and gain the same transactional guarantees that we will
>>> for
>>> > changelog offsets, ensuring that the Position offsets are consistent
>>> with
>>> > the records that are read from the database.
>>> >
>>> > Regards,
>>> > Nick
>>> >
>>> > On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org>
>>> wrote:
>>> >
>>> > > Thanks for publishing this alternative, Nick!
>>> > >
>>> > > The benchmark you mentioned in the KIP-844 discussion seems like a
>>> > > compelling reason to revisit the built-in transactionality
>>> mechanism. I
>>> > > also appreciate you analysis, showing that for most use cases, the
>>> write
>>> > > batch approach should be just fine.
>>> > >
>>> > > There are a couple of points that would hold me back from approving
>>> this
>>> > > KIP right now:
>>> > >
>>> > > 1. Loss of coverage for custom stores.
>>> > > The fact that you can plug in a (relatively) simple implementation
>>> of the
>>> > > XStateStore interfaces and automagically get a distributed database
>>> out
>>> > of
>>> > > it is a significant benefit of Kafka Streams. I'd hate to lose it,
>>> so it
>>> > > would be better to spend some time and come up with a way to preserve
>>> > that
>>> > > property. For example, can we provide a default implementation of
>>> > > `commit(..)` that re-implements the existing checkpoint-file
>>> approach? Or
>>> > > perhaps add an `isTransactional()` flag to the state store interface
>>> so
>>> > > that the runtime can decide whether to continue to manage checkpoint
>>> > files
>>> > > vs delegating transactionality to the stores?
>>> > >
>>> > > 2. Guarding against OOME
>>> > > I appreciate your analysis, but I don't think it's sufficient to say
>>> that
>>> > > we will solve the memory problem later if it becomes necessary. The
>>> > > experience leading to that situation would be quite bad: Imagine, you
>>> > > upgrade to AK 3.next, your tests pass, so you deploy to production.
>>> That
>>> > > night, you get paged because your app is now crashing with OOMEs. As
>>> with
>>> > > all OOMEs, you'll have a really hard time finding the root cause, and
>>> > once
>>> > > you do, you won't have a clear path to resolve the issue. You could
>>> only
>>> > > tune down the commit interval and cache buffer size until you stop
>>> > getting
>>> > > crashes.
>>> > >
>>> > > FYI, I know of multiple cases where people run EOS with much larger
>>> > commit
>>> > > intervals to get better batching than the default, so I don't think
>>> this
>>> > > pathological case would be as rare as you suspect.
>>> > >
>>> > > Given that we already have the rudiments of an idea of what we could
>>> do
>>> > to
>>> > > prevent this downside, we should take the time to design a solution.
>>> We
>>> > owe
>>> > > it to our users to ensure that awesome new features don't come with
>>> > bitter
>>> > > pills unless we can't avoid it.
>>> > >
>>> > > 3. ALOS mode.
>>> > > On the other hand, I didn't see an indication of how stores will be
>>> > > handled under ALOS (aka non-EOS) mode. Theoretically, the
>>> > transactionality
>>> > > of the store and the processing mode are orthogonal. A transactional
>>> > store
>>> > > would serve ALOS just as well as a non-transactional one (if not
>>> better).
>>> > > Under ALOS, though, the default commit interval is five minutes, so
>>> the
>>> > > memory issue is far more pressing.
>>> > >
>>> > > As I see it, we have several options to resolve this point. We could
>>> > > demonstrate that transactional stores work just fine for ALOS and we
>>> can
>>> > > therefore just swap over unconditionally. We could also disable the
>>> > > transactional mechanism under ALOS so that stores operate just the
>>> same
>>> > as
>>> > > they do today when run in ALOS mode. Finally, we could do the same
>>> as in
>>> > > KIP-844 and make transactional stores opt-in (it'd be better to
>>> avoid the
>>> > > extra opt-in mechanism, but it's a good get-out-of-jail-free card).
>>> > >
>>> > > 4. (minor point) Deprecation of methods
>>> > >
>>> > > You mentioned that the new `commit` method replaces flush,
>>> > > updateChangelogOffsets, and checkpoint. It seems to me that the point
>>> > about
>>> > > atomicity and Position also suggests that it replaces the Position
>>> > > callbacks. However, the proposal only deprecates `flush`. Should we
>>> be
>>> > > deprecating other methods as well?
>>> > >
>>> > > Thanks again for the KIP! It's really nice that you and Alex will
>>> get the
>>> > > chance to collaborate on both directions so that we can get the best
>>> > > outcome for Streams and its users.
>>> > >
>>> > > -John
>>> > >
>>> > >
>>> > > On 2022/11/21 15:02:15 Nick Telford wrote:
>>> > > > Hi everyone,
>>> > > >
>>> > > > As I mentioned in the discussion thread for KIP-844, I've been
>>> working
>>> > on
>>> > > > an alternative approach to achieving better transactional
>>> semantics for
>>> > > > Kafka Streams StateStores.
>>> > > >
>>> > > > I've published this separately as KIP-892: Transactional Semantics
>>> for
>>> > > > StateStores
>>> > > > <
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>>> > > >,
>>> > > > so that it can be discussed/reviewed separately from KIP-844.
>>> > > >
>>> > > > Alex: I'm especially interested in what you think!
>>> > > >
>>> > > > I have a nearly complete implementation of the changes outlined in
>>> this
>>> > > > KIP, please let me know if you'd like me to push them for review in
>>> > > advance
>>> > > > of a vote.
>>> > > >
>>> > > > Regards,
>>> > > >
>>> > > > Nick
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to