Hi Nick,

Thanks for the insights! Very interesting!

As far as I understand, you want to atomically update the state store from the transaction buffer, flush the memtable of a state store and write the checkpoint not after the commit time elapsed but after the transaction buffer reached a size that would lead to exceeding statestore.transaction.buffer.max.bytes before the next commit interval ends. That means, the Kafka transaction would commit every commit interval but the state store will only be atomically updated roughly every statestore.transaction.buffer.max.bytes of data. Also IQ would then only see new data roughly every statestore.transaction.buffer.max.bytes. After a failure the state store needs to restore up to statestore.transaction.buffer.max.bytes.

Is this correct?

What about atomically updating the state store from the transaction buffer every commit interval and writing the checkpoint (thus, flushing the memtable) every configured amount of data and/or number of commit intervals? In such a way, we would have the same delay for records appearing in output topics and IQ because both would appear when the Kafka transaction is committed. However, after a failure the state store still needs to restore up to statestore.transaction.buffer.max.bytes and it might restore data that is already in the state store because the checkpoint lags behind the last stable offset (i.e. the last committed offset) of the changelog topics. Restoring data that is already in the state store is idempotent, so eos should not violated. This solution needs at least one new config to specify when a checkpoint should be written.



A small correction to your previous e-mail that does not change anything you said: Under alos the default commit interval is 30 seconds, not five seconds.


Best,
Bruno


On 01.07.23 12:37, Nick Telford wrote:
Hi everyone,

I've begun performance testing my branch on our staging environment,
putting it through its paces in our non-trivial application. I'm already
observing the same increased flush rate that we saw the last time we
attempted to use a version of this KIP, but this time, I think I know why.

Pre-KIP-892, StreamTask#postCommit, which is called at the end of the Task
commit process, has the following behaviour:

    - Under ALOS: checkpoint the state stores. This includes
    flushing memtables in RocksDB. This is acceptable because the default
    commit.interval.ms is 5 seconds, so forcibly flushing memtables every 5
    seconds is acceptable for most applications.
    - Under EOS: checkpointing is not done, *unless* it's being forced, due
    to e.g. the Task closing or being revoked. This means that under normal
    processing conditions, the state stores will not be checkpointed, and will
    not have memtables flushed at all , unless RocksDB decides to flush them on
    its own. Checkpointing stores and force-flushing their memtables is only
    done when a Task is being closed.

Under EOS, KIP-892 needs to checkpoint stores on at least *some* normal
Task commits, in order to write the RocksDB transaction buffers to the
state stores, and to ensure the offsets are synced to disk to prevent
restores from getting out of hand. Consequently, my current implementation
calls maybeCheckpoint on *every* Task commit, which is far too frequent.
This causes checkpoints every 10,000 records, which is a change in flush
behaviour, potentially causing performance problems for some applications.

I'm looking into possible solutions, and I'm currently leaning towards
using the statestore.transaction.buffer.max.bytes configuration to
checkpoint Tasks once we are likely to exceed it. This would complement the
existing "early Task commit" functionality that this configuration
provides, in the following way:

    - Currently, we use statestore.transaction.buffer.max.bytes to force an
    early Task commit if processing more records would cause our state store
    transactions to exceed the memory assigned to them.
    - New functionality: when a Task *does* commit, we will not checkpoint
    the stores (and hence flush the transaction buffers) unless we expect to
    cross the statestore.transaction.buffer.max.bytes threshold before the next
    commit

I'm also open to suggestions.

Regards,
Nick

On Thu, 22 Jun 2023 at 14:06, Nick Telford <nick.telf...@gmail.com> wrote:

Hi Bruno!

3.
By "less predictable for users", I meant in terms of understanding the
performance profile under various circumstances. The more complex the
solution, the more difficult it would be for users to understand the
performance they see. For example, spilling records to disk when the
transaction buffer reaches a threshold would, I expect, reduce write
throughput. This reduction in write throughput could be unexpected, and
potentially difficult to diagnose/understand for users.
At the moment, I think the "early commit" concept is relatively
straightforward; it's easy to document, and conceptually fairly obvious to
users. We could probably add a metric to make it easier to understand when
it happens though.

3. (the second one)
The IsolationLevel is *essentially* an indirect way of telling StateStores
whether they should be transactional. READ_COMMITTED essentially requires
transactions, because it dictates that two threads calling
`newTransaction()` should not see writes from the other transaction until
they have been committed. With READ_UNCOMMITTED, all bets are off, and
stores can allow threads to observe written records at any time, which is
essentially "no transactions". That said, StateStores are free to implement
these guarantees however they can, which is a bit more relaxed than
dictating "you must use transactions". For example, with RocksDB we would
implement these as READ_COMMITTED == WBWI-based "transactions",
READ_UNCOMMITTED == direct writes to the database. But with other storage
engines, it might be preferable to *always* use transactions, even when
unnecessary; or there may be storage engines that don't provide
transactions, but the isolation guarantees can be met using a different
technique.
My idea was to try to keep the StateStore interface as loosely coupled
from the Streams engine as possible, to give implementers more freedom, and
reduce the amount of internal knowledge required.
That said, I understand that "IsolationLevel" might not be the right
abstraction, and we can always make it much more explicit if required, e.g.
boolean transactional()

7-8.
I can make these changes either later today or tomorrow.

Small update:
I've rebased my branch on trunk and fixed a bunch of issues that needed
addressing. Currently, all the tests pass, which is promising, but it will
need to undergo some performance testing. I haven't (yet) worked on
removing the `newTransaction()` stuff, but I would expect that,
behaviourally, it should make no difference. The branch is available at
https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone is
interested in taking an early look.

Regards,
Nick

On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <cado...@apache.org> wrote:

Hi Nick,

1.
Yeah, I agree with you. That was actually also my point. I understood
that John was proposing the ingestion path as a way to avoid the early
commits. Probably, I misinterpreted the intent.

2.
I agree with John here, that actually it is public API. My question is
how this usage pattern affects normal processing.

3.
My concern is that checking for the size of the transaction buffer and
maybe triggering an early commit affects the whole processing of Kafka
Streams. The transactionality of a state store is not confined to the
state store itself, but spills over and changes the behavior of other
parts of the system. I agree with you that it is a decent compromise. I
just wanted to analyse the downsides and list the options to overcome
them. I also agree with you that all options seem quite heavy compared
with your KIP. I do not understand what you mean with "less predictable
for users", though.


I found the discussions about the alternatives really interesting. But I
also think that your plan sounds good and we should continue with it!


Some comments on your reply to my e-mail on June 20th:

3.
Ah, now, I understand the reasoning behind putting isolation level in
the state store context. Thanks! Should that also be a way to give the
the state store the opportunity to decide whether to turn on
transactions or not?
With my comment, I was more concerned about how do you know if a
checkpoint file needs to be written under EOS, if you do not have a way
to know if the state store is transactional or not. If a state store is
transactional, the checkpoint file can be written during normal
processing under EOS. If the state store is not transactional, the
checkpoint file must not be written under EOS.

7.
My point was about not only considering the bytes in memory in config
statestore.uncommitted.max.bytes, but also bytes that might be spilled
on disk. Basically, I was wondering whether you should remove the
"memory" in "Maximum number of memory bytes to be used to
buffer uncommitted state-store records." My thinking was that even if a
state store spills uncommitted bytes to disk, limiting the overall bytes
might make sense. Thinking about it again and considering the recent
discussions, it does not make too much sense anymore.
I like the name statestore.transaction.buffer.max.bytes that you proposed.

8.
A high-level description (without implementation details) of how Kafka
Streams will manage the commit of changelog transactions, state store
transactions and checkpointing would be great. Would be great if you
could also add some sentences about the behavior in case of a failure.
For instance how does a transactional state store recover after a
failure or what happens with the transaction buffer, etc. (that is what
I meant by "fail-over" in point 9.)

Best,
Bruno

On 21.06.23 18:50, Nick Telford wrote:
Hi Bruno,

1.
Isn't this exactly the same issue that WriteBatchWithIndex transactions
have, whereby exceeding (or likely to exceed) configured memory needs to
trigger an early commit?

2.
This is one of my big concerns. Ultimately, any approach based on
cracking
open RocksDB internals and using it in ways it's not really designed
for is
likely to have some unforseen performance or consistency issues.

3.
What's your motivation for removing these early commits? While not
ideal, I
think they're a decent compromise to ensure consistency whilst
maintaining
good and predictable performance.
All 3 of your suggested ideas seem *very* complicated, and might
actually
make behaviour less predictable for users as a consequence.

I'm a bit concerned that the scope of this KIP is growing a bit out of
control. While it's good to discuss ideas for future improvements, I
think
it's important to narrow the scope down to a design that achieves the
most
pressing objectives (constant sized restorations during dirty
close/unexpected errors). Any design that this KIP produces can
ultimately
be changed in the future, especially if the bulk of it is internal
behaviour.

I'm going to spend some time next week trying to re-work the original
WriteBatchWithIndex design to remove the newTransaction() method, such
that
it's just an implementation detail of RocksDBStore. That way, if we
want to
replace WBWI with something in the future, like the SST file management
outlined by John, then we can do so with little/no API changes.

Regards,

Nick




Reply via email to