Hi everyone,

I've updated the KIP (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)
with the latest changes; mostly bringing back "Atomic Checkpointing" (for
what feels like the 10th time!). I think the one thing missing is some
changes to metrics (notably the store "flush" metrics will need to be
renamed to "commit").

The reason I brought back Atomic Checkpointing was to decouple store flush
from store commit. This is important, because with Transactional
StateStores, we now need to call "flush" on *every* Task commit, and not
just when the StateStore is closing, otherwise our transaction buffer will
never be written and persisted, instead growing unbounded! I experimented
with some simple solutions, like forcing a store flush whenever the
transaction buffer was likely to exceed its configured size, but this was
brittle: it prevented the transaction buffer from being configured to be
unbounded, and it still would have required explicit flushes of RocksDB,
yielding sub-optimal performance and memory utilization.

I deemed Atomic Checkpointing to be the "right" way to resolve this
problem. By ensuring that the changelog offsets that correspond to the most
recently written records are always atomically written to the StateStore
(by writing them to the same transaction buffer), we can avoid forcibly
flushing the RocksDB memtables to disk, letting RocksDB flush them only
when necessary, without losing any of our consistency guarantees. See the
updated KIP for more info.

I have fully implemented these changes, although I'm still not entirely
happy with the implementation for segmented StateStores, so I plan to
refactor that. Despite that, all tests pass. If you'd like to try out or
review this highly experimental and incomplete branch, it's available here:
https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: it's built
against Kafka 3.5.0 so that I had a stable base to build and test it on,
and to enable easy apples-to-apples comparisons in a live environment. I
plan to rebase it against trunk once it's nearer completion and has been
proven on our main application.

I would really appreciate help in reviewing and testing:
- Segmented (Versioned, Session and Window) stores
- Global stores

As I do not currently use either of these, so my primary test environment
doesn't test these areas.

I'm going on Parental Leave starting next week for a few weeks, so will not
have time to move this forward until late August. That said, your feedback
is welcome and appreciated, I just won't be able to respond as quickly as
usual.

Regards,
Nick

On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi Bruno
>
> Yes, that's correct, although the impact on IQ is not something I had
> considered.
>
> What about atomically updating the state store from the transaction
>> buffer every commit interval and writing the checkpoint (thus, flushing
>> the memtable) every configured amount of data and/or number of commit
>> intervals?
>>
>
> I'm not quite sure I follow. Are you suggesting that we add an additional
> config for the max number of commit intervals between checkpoints? That
> way, we would checkpoint *either* when the transaction buffers are nearly
> full, *OR* whenever a certain number of commit intervals have elapsed,
> whichever comes first?
>
> That certainly seems reasonable, although this re-ignites an earlier
> debate about whether a config should be measured in "number of commit
> intervals", instead of just an absolute time.
>
> FWIW, I realised that this issue is the reason I was pursuing the Atomic
> Checkpoints, as it de-couples memtable flush from checkpointing, which
> enables us to just checkpoint on every commit without any performance
> impact. Atomic Checkpointing is definitely the "best" solution, but I'm not
> sure if this is enough to bring it back into this KIP.
>
> I'm currently working on moving all the transactional logic directly into
> RocksDBStore itself, which does away with the StateStore#newTransaction
> method, and reduces the number of new classes introduced, significantly
> reducing the complexity. If it works, and the complexity is drastically
> reduced, I may try bringing back Atomic Checkpoints into this KIP.
>
> Regards,
> Nick
>
> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <cado...@apache.org> wrote:
>
>> Hi Nick,
>>
>> Thanks for the insights! Very interesting!
>>
>> As far as I understand, you want to atomically update the state store
>> from the transaction buffer, flush the memtable of a state store and
>> write the checkpoint not after the commit time elapsed but after the
>> transaction buffer reached a size that would lead to exceeding
>> statestore.transaction.buffer.max.bytes before the next commit interval
>> ends.
>> That means, the Kafka transaction would commit every commit interval but
>> the state store will only be atomically updated roughly every
>> statestore.transaction.buffer.max.bytes of data. Also IQ would then only
>> see new data roughly every statestore.transaction.buffer.max.bytes.
>> After a failure the state store needs to restore up to
>> statestore.transaction.buffer.max.bytes.
>>
>> Is this correct?
>>
>> What about atomically updating the state store from the transaction
>> buffer every commit interval and writing the checkpoint (thus, flushing
>> the memtable) every configured amount of data and/or number of commit
>> intervals? In such a way, we would have the same delay for records
>> appearing in output topics and IQ because both would appear when the
>> Kafka transaction is committed. However, after a failure the state store
>> still needs to restore up to statestore.transaction.buffer.max.bytes and
>> it might restore data that is already in the state store because the
>> checkpoint lags behind the last stable offset (i.e. the last committed
>> offset) of the changelog topics. Restoring data that is already in the
>> state store is idempotent, so eos should not violated.
>> This solution needs at least one new config to specify when a checkpoint
>> should be written.
>>
>>
>>
>> A small correction to your previous e-mail that does not change anything
>> you said: Under alos the default commit interval is 30 seconds, not five
>> seconds.
>>
>>
>> Best,
>> Bruno
>>
>>
>> On 01.07.23 12:37, Nick Telford wrote:
>> > Hi everyone,
>> >
>> > I've begun performance testing my branch on our staging environment,
>> > putting it through its paces in our non-trivial application. I'm already
>> > observing the same increased flush rate that we saw the last time we
>> > attempted to use a version of this KIP, but this time, I think I know
>> why.
>> >
>> > Pre-KIP-892, StreamTask#postCommit, which is called at the end of the
>> Task
>> > commit process, has the following behaviour:
>> >
>> >     - Under ALOS: checkpoint the state stores. This includes
>> >     flushing memtables in RocksDB. This is acceptable because the
>> default
>> >     commit.interval.ms is 5 seconds, so forcibly flushing memtables
>> every 5
>> >     seconds is acceptable for most applications.
>> >     - Under EOS: checkpointing is not done, *unless* it's being forced,
>> due
>> >     to e.g. the Task closing or being revoked. This means that under
>> normal
>> >     processing conditions, the state stores will not be checkpointed,
>> and will
>> >     not have memtables flushed at all , unless RocksDB decides to flush
>> them on
>> >     its own. Checkpointing stores and force-flushing their memtables is
>> only
>> >     done when a Task is being closed.
>> >
>> > Under EOS, KIP-892 needs to checkpoint stores on at least *some* normal
>> > Task commits, in order to write the RocksDB transaction buffers to the
>> > state stores, and to ensure the offsets are synced to disk to prevent
>> > restores from getting out of hand. Consequently, my current
>> implementation
>> > calls maybeCheckpoint on *every* Task commit, which is far too frequent.
>> > This causes checkpoints every 10,000 records, which is a change in flush
>> > behaviour, potentially causing performance problems for some
>> applications.
>> >
>> > I'm looking into possible solutions, and I'm currently leaning towards
>> > using the statestore.transaction.buffer.max.bytes configuration to
>> > checkpoint Tasks once we are likely to exceed it. This would complement
>> the
>> > existing "early Task commit" functionality that this configuration
>> > provides, in the following way:
>> >
>> >     - Currently, we use statestore.transaction.buffer.max.bytes to
>> force an
>> >     early Task commit if processing more records would cause our state
>> store
>> >     transactions to exceed the memory assigned to them.
>> >     - New functionality: when a Task *does* commit, we will not
>> checkpoint
>> >     the stores (and hence flush the transaction buffers) unless we
>> expect to
>> >     cross the statestore.transaction.buffer.max.bytes threshold before
>> the next
>> >     commit
>> >
>> > I'm also open to suggestions.
>> >
>> > Regards,
>> > Nick
>> >
>> > On Thu, 22 Jun 2023 at 14:06, Nick Telford <nick.telf...@gmail.com>
>> wrote:
>> >
>> >> Hi Bruno!
>> >>
>> >> 3.
>> >> By "less predictable for users", I meant in terms of understanding the
>> >> performance profile under various circumstances. The more complex the
>> >> solution, the more difficult it would be for users to understand the
>> >> performance they see. For example, spilling records to disk when the
>> >> transaction buffer reaches a threshold would, I expect, reduce write
>> >> throughput. This reduction in write throughput could be unexpected, and
>> >> potentially difficult to diagnose/understand for users.
>> >> At the moment, I think the "early commit" concept is relatively
>> >> straightforward; it's easy to document, and conceptually fairly
>> obvious to
>> >> users. We could probably add a metric to make it easier to understand
>> when
>> >> it happens though.
>> >>
>> >> 3. (the second one)
>> >> The IsolationLevel is *essentially* an indirect way of telling
>> StateStores
>> >> whether they should be transactional. READ_COMMITTED essentially
>> requires
>> >> transactions, because it dictates that two threads calling
>> >> `newTransaction()` should not see writes from the other transaction
>> until
>> >> they have been committed. With READ_UNCOMMITTED, all bets are off, and
>> >> stores can allow threads to observe written records at any time, which
>> is
>> >> essentially "no transactions". That said, StateStores are free to
>> implement
>> >> these guarantees however they can, which is a bit more relaxed than
>> >> dictating "you must use transactions". For example, with RocksDB we
>> would
>> >> implement these as READ_COMMITTED == WBWI-based "transactions",
>> >> READ_UNCOMMITTED == direct writes to the database. But with other
>> storage
>> >> engines, it might be preferable to *always* use transactions, even when
>> >> unnecessary; or there may be storage engines that don't provide
>> >> transactions, but the isolation guarantees can be met using a different
>> >> technique.
>> >> My idea was to try to keep the StateStore interface as loosely coupled
>> >> from the Streams engine as possible, to give implementers more
>> freedom, and
>> >> reduce the amount of internal knowledge required.
>> >> That said, I understand that "IsolationLevel" might not be the right
>> >> abstraction, and we can always make it much more explicit if required,
>> e.g.
>> >> boolean transactional()
>> >>
>> >> 7-8.
>> >> I can make these changes either later today or tomorrow.
>> >>
>> >> Small update:
>> >> I've rebased my branch on trunk and fixed a bunch of issues that needed
>> >> addressing. Currently, all the tests pass, which is promising, but it
>> will
>> >> need to undergo some performance testing. I haven't (yet) worked on
>> >> removing the `newTransaction()` stuff, but I would expect that,
>> >> behaviourally, it should make no difference. The branch is available at
>> >> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone is
>> >> interested in taking an early look.
>> >>
>> >> Regards,
>> >> Nick
>> >>
>> >> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <cado...@apache.org>
>> wrote:
>> >>
>> >>> Hi Nick,
>> >>>
>> >>> 1.
>> >>> Yeah, I agree with you. That was actually also my point. I understood
>> >>> that John was proposing the ingestion path as a way to avoid the early
>> >>> commits. Probably, I misinterpreted the intent.
>> >>>
>> >>> 2.
>> >>> I agree with John here, that actually it is public API. My question is
>> >>> how this usage pattern affects normal processing.
>> >>>
>> >>> 3.
>> >>> My concern is that checking for the size of the transaction buffer and
>> >>> maybe triggering an early commit affects the whole processing of Kafka
>> >>> Streams. The transactionality of a state store is not confined to the
>> >>> state store itself, but spills over and changes the behavior of other
>> >>> parts of the system. I agree with you that it is a decent compromise.
>> I
>> >>> just wanted to analyse the downsides and list the options to overcome
>> >>> them. I also agree with you that all options seem quite heavy compared
>> >>> with your KIP. I do not understand what you mean with "less
>> predictable
>> >>> for users", though.
>> >>>
>> >>>
>> >>> I found the discussions about the alternatives really interesting.
>> But I
>> >>> also think that your plan sounds good and we should continue with it!
>> >>>
>> >>>
>> >>> Some comments on your reply to my e-mail on June 20th:
>> >>>
>> >>> 3.
>> >>> Ah, now, I understand the reasoning behind putting isolation level in
>> >>> the state store context. Thanks! Should that also be a way to give the
>> >>> the state store the opportunity to decide whether to turn on
>> >>> transactions or not?
>> >>> With my comment, I was more concerned about how do you know if a
>> >>> checkpoint file needs to be written under EOS, if you do not have a
>> way
>> >>> to know if the state store is transactional or not. If a state store
>> is
>> >>> transactional, the checkpoint file can be written during normal
>> >>> processing under EOS. If the state store is not transactional, the
>> >>> checkpoint file must not be written under EOS.
>> >>>
>> >>> 7.
>> >>> My point was about not only considering the bytes in memory in config
>> >>> statestore.uncommitted.max.bytes, but also bytes that might be spilled
>> >>> on disk. Basically, I was wondering whether you should remove the
>> >>> "memory" in "Maximum number of memory bytes to be used to
>> >>> buffer uncommitted state-store records." My thinking was that even if
>> a
>> >>> state store spills uncommitted bytes to disk, limiting the overall
>> bytes
>> >>> might make sense. Thinking about it again and considering the recent
>> >>> discussions, it does not make too much sense anymore.
>> >>> I like the name statestore.transaction.buffer.max.bytes that you
>> proposed.
>> >>>
>> >>> 8.
>> >>> A high-level description (without implementation details) of how Kafka
>> >>> Streams will manage the commit of changelog transactions, state store
>> >>> transactions and checkpointing would be great. Would be great if you
>> >>> could also add some sentences about the behavior in case of a failure.
>> >>> For instance how does a transactional state store recover after a
>> >>> failure or what happens with the transaction buffer, etc. (that is
>> what
>> >>> I meant by "fail-over" in point 9.)
>> >>>
>> >>> Best,
>> >>> Bruno
>> >>>
>> >>> On 21.06.23 18:50, Nick Telford wrote:
>> >>>> Hi Bruno,
>> >>>>
>> >>>> 1.
>> >>>> Isn't this exactly the same issue that WriteBatchWithIndex
>> transactions
>> >>>> have, whereby exceeding (or likely to exceed) configured memory
>> needs to
>> >>>> trigger an early commit?
>> >>>>
>> >>>> 2.
>> >>>> This is one of my big concerns. Ultimately, any approach based on
>> >>> cracking
>> >>>> open RocksDB internals and using it in ways it's not really designed
>> >>> for is
>> >>>> likely to have some unforseen performance or consistency issues.
>> >>>>
>> >>>> 3.
>> >>>> What's your motivation for removing these early commits? While not
>> >>> ideal, I
>> >>>> think they're a decent compromise to ensure consistency whilst
>> >>> maintaining
>> >>>> good and predictable performance.
>> >>>> All 3 of your suggested ideas seem *very* complicated, and might
>> >>> actually
>> >>>> make behaviour less predictable for users as a consequence.
>> >>>>
>> >>>> I'm a bit concerned that the scope of this KIP is growing a bit out
>> of
>> >>>> control. While it's good to discuss ideas for future improvements, I
>> >>> think
>> >>>> it's important to narrow the scope down to a design that achieves the
>> >>> most
>> >>>> pressing objectives (constant sized restorations during dirty
>> >>>> close/unexpected errors). Any design that this KIP produces can
>> >>> ultimately
>> >>>> be changed in the future, especially if the bulk of it is internal
>> >>>> behaviour.
>> >>>>
>> >>>> I'm going to spend some time next week trying to re-work the original
>> >>>> WriteBatchWithIndex design to remove the newTransaction() method,
>> such
>> >>> that
>> >>>> it's just an implementation detail of RocksDBStore. That way, if we
>> >>> want to
>> >>>> replace WBWI with something in the future, like the SST file
>> management
>> >>>> outlined by John, then we can do so with little/no API changes.
>> >>>>
>> >>>> Regards,
>> >>>>
>> >>>> Nick
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to