Hi Nick,

What you and Lucas wrote about the different configurations of ALOS/EOS and READ_COMMITTED/READ_UNCOMMITTED make sense to me. My earlier concerns about changelogs diverging from the content of the local state stores turned out to not apply. So I think, we can move on with those configurations.

Regarding the TaskCorruptedException and wiping out the state stores under EOS, couldn't we abort the transaction on the state store and close the task dirty? If the Kafka transaction was indeed committed, the store would restore the missing part from the changelog topic. If the Kafka transaction was not committed, changelog topic and state store are in-sync.

In any case, IMO those are implementation details that we do not need to discuss and solve in the KIP discussion. We can solve them on the PR. The important thing is that the processing guarantees hold.

Best,
Bruno

On 10/18/23 3:56 PM, Nick Telford wrote:
Hi Lucas,

TaskCorruptedException is how Streams signals that the Task state needs to
be wiped, so we can't retain that exception without also wiping state on
timeouts.

Regards,
Nick

On Wed, 18 Oct 2023 at 14:48, Lucas Brutschy <lbruts...@confluent.io.invalid>
wrote:

Hi Nick,

I think indeed the better behavior would be to retry commitTransaction
until we risk running out of time to meet `max.poll.interval.ms`.

However, if it's handled as a `TaskCorruptedException` at the moment,
I would do the same in this KIP, and leave exception handling
improvements to future work. This KIP is already improving the
situation a lot by not wiping the state store.

Cheers,
Lucas

On Tue, Oct 17, 2023 at 3:51 PM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi Lucas,

Yeah, this is pretty much the direction I'm thinking of going in now. You
make an interesting point about committing on-error under
ALOS/READ_COMMITTED, although I haven't had a chance to think through the
implications yet.

Something that I ran into earlier this week is an issue with the new
handling of TimeoutException. Without TX stores, TimeoutException under
EOS
throws a TaskCorruptedException, which wipes the stores. However, with TX
stores, TimeoutException is now just bubbled up and dealt with as it is
under ALOS. The problem arises when the Producer#commitTransaction call
times out: Streams attempts to ignore the error and continue producing,
which causes the next call to Producer#send to throw
"IllegalStateException: Cannot attempt operation `send` because the
previous call to `commitTransaction` timed out and must be retried".

I'm not sure what we should do here: retrying the commitTransaction seems
logical, but what if it times out again? Where do we draw the line and
shutdown the instance?

Regards,
Nick

On Mon, 16 Oct 2023 at 13:19, Lucas Brutschy <lbruts...@confluent.io
.invalid>
wrote:

Hi all,

I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED,
but keep wiping the state on error, and I'd vote for this solution
when introducing `default.state.isolation.level`. This way, we'd have
the most low-risk roll-out of this feature (no behavior change without
reconfiguration), with the possibility of switching to the most sane /
battle-tested default settings in 4.0. Essentially, we'd have a
feature flag but call it `default.state.isolation.level` and don't
have to deprecate it later.

So the possible configurations would then be this:

1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
reads from DB.
2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
WriteBatch/DB. Flush on error (see note below).
3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
reads from DB. Wipe state on error.
4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
WriteBatch/DB.

I believe the feature is important enough that we will see good
adoption even without changing the default. In 4.0, when we have seen
this being adopted and is battle-tested, we make READ_COMMITTED the
default for EOS, or even READ_COMITTED always the default, depending
on our experiences. And we could add a clever implementation of
READ_UNCOMITTED with WriteBatches later.

The only smell here is that `default.state.isolation.level` wouldn't
be purely an IQ setting, but it would also (slightly) change the
behavior of the processing, but that seems unavoidable as long as we
haven't solve READ_UNCOMITTED IQ with WriteBatches.

Minor: As for Bruno's point 4, I think if we are concerned about this
behavior (we don't necessarily have to be, because it doesn't violate
ALOS guarantees as far as I can see), we could make
ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing
the WriteBatch on error (obviously, only if we have a chance to do
that).

Cheers,
Lucas

On Mon, Oct 16, 2023 at 12:19 PM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi Guozhang,

The KIP as it stands introduces a new configuration,
default.state.isolation.level, which is independent of
processing.mode.
It's intended that this new configuration be used to configure a
global
IQ
isolation level in the short term, with a future KIP introducing the
capability to change the isolation level on a per-query basis,
falling
back
to the "default" defined by this config. That's why I called it
"default",
for future-proofing.

However, it currently includes the caveat that READ_UNCOMMITTED is
not
available under EOS. I think this is the coupling you are alluding
to?

This isn't intended to be a restriction of the API, but is currently
a
technical limitation. However, after discussing with some users about
use-cases that would require READ_UNCOMMITTED under EOS, I'm
inclined to
remove that clause and put in the necessary work to make that
combination
possible now.

I currently see two possible approaches:

    1. Disable TX StateStores internally when the IsolationLevel is
    READ_UNCOMMITTED and the processing.mode is EOS. This is more
difficult
    than it sounds, as there are many assumptions being made
throughout
the
    internals about the guarantees StateStores provide. It would
definitely add
    a lot of extra "if (read_uncommitted && eos)" branches,
complicating
    maintenance and testing.
    2. Invest the time *now* to make READ_UNCOMMITTED of EOS
StateStores
    possible. I have some ideas on how this could be achieved, but
they
would
    need testing and could introduce some additional issues. The
benefit
of
    this approach is that it would make query-time IsolationLevels
much
simpler
    to implement in the future.

Unfortunately, both will require considerable work that will further
delay
this KIP, which was the reason I placed the restriction in the KIP
in the
first place.

Regards,
Nick

On Sat, 14 Oct 2023 at 03:30, Guozhang Wang <
guozhang.wang...@gmail.com>
wrote:

Hello Nick,

First of all, thanks a lot for the great effort you've put in
driving
this KIP! I really like it coming through finally, as many people
in
the community have raised this. At the same time I honestly feel a
bit
ashamed for not putting enough of my time supporting it and
pushing it
through the finish line (you raised this KIP almost a year ago).

I briefly passed through the DISCUSS thread so far, not sure I've
100
percent digested all the bullet points. But with the goal of
trying to
help take it through the finish line in mind, I'd want to throw
thoughts on top of my head only on the point #4 above which I felt
may
be the main hurdle for the current KIP to drive to a consensus now.

The general question I asked myself is, whether we want to couple
"IQ
reading mode" with "processing mode". While technically I tend to
agree with you that, it's feels like a bug if some single user
chose
"EOS" for processing mode while choosing "read uncommitted" for IQ
reading mode, at the same time, I'm thinking if it's possible that
there could be two different persons (or even two teams) that
would be
using the stream API to build the app, and the IQ API to query the
running state of the app. I know this is less of a technical thing
but
rather a more design stuff, but if it could be ever the case, I'm
wondering if the personale using the IQ API knows about the risks
of
using read uncommitted but still chose so for the favor of
performance, no matter if the underlying stream processing mode
configured by another personale is EOS or not. In that regard, I'm
leaning towards a "leaving the door open, and close it later if we
found it's a bad idea" aspect with a configuration that we can
potentially deprecate than "shut the door, clean for everyone".
More
specifically, allowing the processing mode / IQ read mode to be
decoupled, and if we found that there's no such cases as I
speculated
above or people started complaining a lot, we can still enforce
coupling them.

Again, just my 2c here. Thanks again for the great patience and
diligence on this KIP.


Guozhang



On Fri, Oct 13, 2023 at 8:48 AM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Bruno,

4.
I'll hold off on making that change until we have a consensus as
to
what
configuration to use to control all of this, as it'll be
affected by
the
decision on EOS isolation levels.

5.
Done. I've chosen "committedOffsets".

Regards,
Nick

On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna <cado...@apache.org>
wrote:

Hi Nick,

1.
Yeah, you are probably right that it does not make too much
sense.
Thanks for the clarification!


4.
Yes, sorry for the back and forth, but I think for the sake of
the
KIP
it is better to let the ALOS behavior as it is for now due to
the
possible issues you would run into. Maybe we can find a
solution
in the
future. Now the question returns to whether we really need
default.state.isolation.level. Maybe the config could be the
feature
flag Sophie requested.


5.
There is a guideline in Kafka not to use the get prefix for
getters (at
least in the public API). Thus, could you please rename

getCommittedOffset(TopicPartition partition) ->
committedOffsetFor(TopicPartition partition)

You can also propose an alternative to committedOffsetFor().


Best,
Bruno


On 10/13/23 3:21 PM, Nick Telford wrote:
Hi Bruno,

Thanks for getting back to me.

1.
I think this should be possible. Are you thinking of the
situation
where
a
user may downgrade to a previous version of Kafka Streams? In
that
case,
sadly, the RocksDBStore would get wiped by the older version
of
Kafka
Streams anyway, because that version wouldn't understand the
extra
column
family (that holds offsets), so the missing Position file
would
automatically get rebuilt when the store is rebuilt from the
changelog.
Are there other situations than downgrade where a
transactional
store
could
be replaced by a non-transactional one? I can't think of any.

2.
Ahh yes, the Test Plan - my Kryptonite! This section
definitely
needs to
be
fleshed out. I'll work on that. How much detail do you need?

3.
See my previous email discussing this.

4.
Hmm, this is an interesting point. Are you suggesting that
under
ALOS
READ_COMMITTED should not be supported?

Regards,
Nick

On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Nick,

I think the KIP is converging!


1.
I am wondering whether it makes sense to write the position
file
during
close as we do for the checkpoint file, so that in case the
state
store
is replaced with a non-transactional state store the
non-transactional
state store finds the position file. I think, this is not
strictly
needed, but would be a nice behavior instead of just
deleting
the
position file.


2.
The test plan does not mention integration tests. Do you not
need to
extend existing ones and add new ones. Also for upgrading
and
downgrading you might need integration and/or system tests.


3.
I think Sophie made a point. Although, IQ reading from
uncommitted
data
under EOS might be considered a bug by some people. Thus,
your
KIP
would
fix a bug rather than changing the intended behavior.
However, I
also
see that a feature flag would help users that rely on this
buggy
behavior (at least until AK 4.0).


4.
This is related to the previous point. I assume that the
difference
between READ_COMMITTED and READ_UNCOMMITTED for ALOS is
that in
the
former you enable transactions on the state store and in the
latter
you
disable them. If my assumption is correct, I think that is
an
issue.
Let's assume under ALOS Streams fails over a couple of times
more or
less at the same step in processing after value 3 is added
to an
aggregation but the offset of the corresponding input record
was not
committed. Without transactions disabled, the aggregation
value
would
increase by 3 for each failover. With transactions enabled,
value 3
would only be added to the aggregation once when the offset
of
the
input
record is committed and the transaction finally completes.
So
the
content of the state store would change depending on the
configuration
for IQ. IMO, the content of the state store should be
independent
from
IQ. Given this issue, I propose to not use transactions with
ALOS at
all. I was a big proponent of using transactions with ALOS,
but
I
realized that transactions with ALOS is not as easy as
enabling
transactions on state stores. Another aspect that is
problematic is
that
the changelog topic which actually replicates the state
store
is not
transactional under ALOS. Thus, it might happen that the
state
store and
the changelog differ in their content. All of this is maybe
solvable
somehow, but for the sake of this KIP, I would leave it for
the
future.


Best,
Bruno



On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote:
Hey Nick! First of all thanks for taking up this awesome
feature,
I'm
sure
every single
Kafka Streams user and dev would agree that it is sorely
needed.

I've just been catching up on the KIP and surrounding
discussion,
so
please
forgive me
for any misunderstandings or misinterpretations of the
current
plan and
don't hesitate to
correct me.

Before I jump in, I just want to say that having seen this
drag on
for
so
long, my singular
goal in responding is to help this KIP past a perceived
impasse so
we
can
finally move on
to voting and implementing it. Long discussions are to be
expected
for
major features like
this but it's completely on us as the Streams devs to make
sure
there
is
an
end in sight
for any ongoing discussion.

With that said, it's my understanding that the KIP as
currently
proposed
is
just not tenable
for Kafka Streams, and would prevent some EOS users from
upgrading
to
the
version it
first appears in. Given that we can't predict or guarantee
whether
any
of
the followup KIPs
would be completed in the same release cycle as this one,
we
need
to
make
sure that the
feature is either compatible with all current users or else
feature-flagged
so that they may
opt in/out.

Therefore, IIUC we need to have either (or both) of these
as
fully-implemented config options:
1. default.state.isolation.level
2. enable.transactional.state.stores

This way EOS users for whom read_committed semantics are
not
viable can
still upgrade,
and either use the isolation.level config to leverage the
new
txn
state
stores without sacrificing
their application semantics, or else simply keep the
transactional
state
stores disabled until we
are able to fully implement the isolation level
configuration
at
either
an
application or query level.

Frankly you are the expert here and know much more about
the
tradeoffs
in
both semantics and
effort level of implementing one of these configs vs the
other. In
my
opinion, either option would
be fine and I would leave the decision of which one to
include
in
this
KIP
completely up to you.
I just don't see a way for the KIP to proceed without some
variation of
the
above that would allow
EOS users to opt-out of read_committed.

(If it's all the same to you, I would recommend always
including a
feature
flag in large structural
changes like this. No matter how much I trust someone or
myself to
implement a feature, you just
never know what kind of bugs might slip in, especially
with the
very
first
iteration that gets released.
So personally, my choice would be to add the feature flag
and
leave it
off
by default. If all goes well
you can do a quick KIP to enable it by default as soon as
the
isolation.level config has been
completed. But feel free to just pick whichever option is
easiest
or
quickest for you to implement)

Hope this helps move the discussion forward,
Sophie

On Tue, Sep 19, 2023 at 1:57 AM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Bruno,

Agreed, I can live with that for now.

In an effort to keep the scope of this KIP from
expanding, I'm
leaning
towards just providing a configurable
default.state.isolation.level
and
removing IsolationLevel from the StateStoreContext. This
would be
compatible with adding support for query-time
IsolationLevels
in
the
future, whilst providing a way for users to select an
isolation
level
now.

The big problem with this, however, is that if a user
selects
processing.mode
= "exactly-once(-v2|-beta)", and
default.state.isolation.level =
"READ_UNCOMMITTED", we need to guarantee that the data
isn't
written
to
disk until commit() is called, but we also need to permit
IQ
threads
to
read from the ongoing transaction.

A simple solution would be to (temporarily) forbid this
combination of
configuration, and have default.state.isolation.level
automatically
switch
to READ_COMMITTED when processing.mode is anything other
than
at-least-once. Do you think this would be acceptable?

In a later KIP, we can add support for query-time
isolation
levels and
solve this particular problem there, which would relax
this
restriction.

Regards,
Nick

On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <
cado...@apache.org>
wrote:

Why do we need to add READ_COMMITTED to
InMemoryKeyValueStore? I
think
it is perfectly valid to say InMemoryKeyValueStore do not
support
READ_COMMITTED for now, since READ_UNCOMMITTED is the
de-facto
default
at the moment.

Best,
Bruno

On 9/18/23 7:12 PM, Nick Telford wrote:
Oh! One other concern I haven't mentioned: if we make
IsolationLevel a
query-time constraint, then we need to add support for
READ_COMMITTED
to
InMemoryKeyValueStore too, which will require some
changes
to
the
implementation.

On Mon, 18 Sept 2023 at 17:24, Nick Telford <
nick.telf...@gmail.com

wrote:

Hi everyone,

I agree that having IsolationLevel be determined at
query-time
is
the
ideal design, but there are a few sticking points:

1.
There needs to be some way to communicate the
IsolationLevel
down
to
the
RocksDBStore itself, so that the query can respect it.
Since
stores
are
"layered" in functionality (i.e. ChangeLoggingStore,
MeteredStore,
etc.),
we need some way to deliver that information to the
bottom
layer.
For
IQv2,
we can use the existing State#query() method, but IQv1
has
no
way
to
do
this.

A simple approach, which would potentially open up
other
options,
would
be
to add something like: ReadOnlyKeyValueStore<K, V>
readOnlyView(IsolationLevel isolationLevel) to
ReadOnlyKeyValueStore
(and
similar to ReadOnlyWindowStore, ReadOnlySessionStore,
etc.).

2.
As mentioned above, RocksDB WriteBatches are not
thread-safe,
which
causes
a problem if we want to provide READ_UNCOMMITTED
Iterators. I
also
had a
look at RocksDB Transactions[1], but they solve a very
different
problem,
and have the same thread-safety issue.

One possible approach that I mentioned is chaining
WriteBatches:
every
time a new Interactive Query is received (i.e.
readOnlyView,
see
above,
is called) we "freeze" the existing WriteBatch, and
start a
new one
for
new
writes. The Interactive Query queries the "chain" of
previous
WriteBatches
+ the underlying database; while the StreamThread
starts
writing to
the
*new* WriteBatch. On-commit, the StreamThread would
write
*all*
WriteBatches in the chain to the database (that have
not
yet
been
written).

WriteBatches would be closed/freed only when they have
been
both
committed, and all open Interactive Queries on them
have
been
closed.
This
would require some reference counting.

Obviously a drawback of this approach is the potential
for
increased
memory usage: if an Interactive Query is long-lived,
for
example by
doing a
full scan over a large database, or even just pausing
in
the
middle
of
an
iteration, then the existing chain of WriteBatches
could be
kept
around
for
a long time, potentially forever.

--

A.
Going off on a tangent, it looks like in addition to
supporting
READ_COMMITTED queries, we could go further and support
REPEATABLE_READ
queries (i.e. where subsequent reads to the same key
in the
same
Interactive Query are guaranteed to yield the same
value)
by
making
use
of
RocksDB Snapshots[2]. These are fairly lightweight, so
the
performance
impact is likely to be negligible, but they do require
that the
Interactive
Query session can be explicitly closed.

This could be achieved if we made the above
readOnlyView
interface
look
more like:

interface ReadOnlyKeyValueView<K, V> implements
ReadOnlyKeyValueStore<K,
V>, AutoCloseable {}

interface ReadOnlyKeyValueStore<K, V> {
        ...
        ReadOnlyKeyValueView<K, V>
readOnlyView(IsolationLevel
isolationLevel);
}

But this would be a breaking change, as existing IQv1
queries
are
guaranteed to never call store.close(), and therefore
these
would
leak
memory under REPEATABLE_READ.

B.
One thing that's notable: MyRocks states that they
support
READ_COMMITTED
and REPEATABLE_READ, but they make no mention of
READ_UNCOMMITTED[3][4].
This could be because doing so is technically
difficult/impossible
using
the primitives available in RocksDB.

--

Lucas, to address your points:

U1.
It's only "SHOULD" to permit alternative (i.e.
non-RocksDB)
implementations of StateStore that do not support
atomic
writes.
Obviously
in those cases, the guarantees Kafka Streams
provides/expects
would
be
relaxed. Do you think we should require all
implementations to
support
atomic writes?

U2.
Stores can support multiple IsolationLevels. As we've
discussed
above,
the
ideal scenario would be to specify the IsolationLevel
at
query-time.
Failing that, I think the second-best approach is to
define the
IsolationLevel for *all* queries based on the
processing.mode,
which
is
what the default StateStoreContext#isolationLevel()
achieves.
Would
you
prefer an alternative?

While the existing implementation is equivalent to
READ_UNCOMMITTED,
this
can yield unexpected results/errors under EOS, if a
transaction is
rolled
back. While this would be a change in behaviour for
users,
it
would
look
more like a bug fix than a breaking change. That said,
we
*could*
make
it
configurable, and default to the existing behaviour
(READ_UNCOMMITTED)
instead of inferring it from the processing.mode?

N1, N2.
These were only primitives to avoid boxing costs, but
since
this is
not
a
performance sensitive area, it should be fine to
change if
that's
desirable.

N3.
It's because the store "manages its own offsets", which
includes
both
committing the offset, *and providing it* via
getCommittedOffset().
Personally, I think "managesOffsets" conveys this best,
but I
don't
mind
changing it if the nomenclature is unclear.

Sorry for the massive emails/essays!
--
Nick

1:
https://github.com/facebook/rocksdb/wiki/Transactions
2: https://github.com/facebook/rocksdb/wiki/Snapshot
3:

https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
4:
https://mariadb.com/kb/en/myrocks-transactional-isolation/

On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

Hi Nick,

since I last read it in April, the KIP has become much
cleaner and
easier to read. Great work!

It feels to me the last big open point is whether we
can
implement
isolation level as a query parameter. I understand
that
there
are
implementation concerns, but as Colt says, it would
be a
great
addition, and would also simplify the migration path
for
this
change.
Is the implementation problem you mentioned caused by
the
WriteBatch
not having a notion of a snapshot, as the underlying
DB
iterator
does?
In that case, I am not sure a chain of WriteBatches
as you
propose
would fully solve the problem, but maybe I didn't dig
enough
into
the
details to fully understand it.

If it's not possible to implement it now, would it be
an
option to
make sure in this KIP that we do not fully close the
door
on
per-query
isolation levels in the interface, as it may be
possible
to
implement
the missing primitives in RocksDB or Speedb in the
future.

Understanding:

* U1) Why is it only "SHOULD" for changelogOffsets to
be
persisted
atomically with the records?
* U2) Don't understand the default implementation of
`isolationLevel`.
The isolation level should be a property of the
underlying
store,
and
not be defined by the default config? Existing stores
probably
don't
guarantee READ_COMMITTED, so the default should be to
return
READ_UNCOMMITTED.

Nits:
* N1) Could `getComittedOffset` use an `OptionalLong`
return
type,
to
avoid the `null`?
* N2) Could `apporixmateNumUncomittedBytes` use an
`OptionalLong`
return type, to avoid the `-1`?
* N3) I don't understand why `managesOffsets` uses the
'manage'
verb,
whereas all other methods use the "commits" verb. I'd
suggest
`commitsOffsets`.

Either way, it feels this KIP is very close to the
finish
line,
I'm
looking forward to seeing this in production!

Cheers,
Lucas

On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <
c...@littlehorse.io

wrote:

Making IsolationLevel a query-time constraint,
rather
than
linking
it
to
the processing.guarantee.

As I understand it, would this allow even a user of
EOS
to
control
whether
reading committed or uncommitted records? If so, I am
highly
in
favor
of
this.

I know that I was one of the early people to point
out
the
current
shortcoming that IQ reads uncommitted records, but
just
this
morning I
realized a pattern we use which means that (for
certain
queries)
our
system
needs to be able to read uncommitted records, which
is
the
current
behavior
of Kafka Streams in EOS.***

If IsolationLevel being a query-time decision allows
for
this,
then
that
would be amazing. I would also vote that the default
behavior
should
be
for
reading uncommitted records, because it is totally
possible
for a
valid
application to depend on that behavior, and breaking
it
in a
minor
release
might be a bit strong.

*** (Note, for the curious reader....) Our
use-case/query
pattern
is a
bit
complex, but reading "uncommitted" records is
actually
safe
in
our
case
because processing is deterministic. Additionally, IQ
being
able
to
read
uncommitted records is crucial to enable "read your
own
writes"
on
our
API:
Due to the deterministic processing, we send an
"ack" to
the
client
who
makes the request as soon as the processor processes
the
result.
If
they
can't read uncommitted records, they may receive a
"201 -
Created"
response, immediately followed by a "404 - Not Found"
when
doing
a
lookup
for the object they just created).

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <
nick.telf...@gmail.com>
wrote:

Addendum:

I think we would also face the same problem with the
approach
John
outlined
earlier (using the record cache as a transaction
buffer
and
flushing
it
straight to SST files). This is because the record
cache
(the
ThreadCache
class) is not thread-safe, so every commit would
invalidate
open
IQ
Iterators in the same way that RocksDB WriteBatches
do.
--
Nick

On Wed, 13 Sept 2023 at 16:58, Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Bruno,

I've updated the KIP based on our conversation. The
only
things
I've not
yet done are:

1. Using transactions under ALOS and EOS.
2. Making IsolationLevel a query-time constraint,
rather
than
linking it
to the processing.guarantee.

There's a wrinkle that makes this a challenge:
Interactive
Queries
that
open an Iterator, when using transactions and
READ_UNCOMMITTED.
The problem is that under READ_UNCOMMITTED, queries
need
to be
able
to
read records from the currently uncommitted
transaction
buffer
(WriteBatch). This includes for Iterators, which
should
iterate
both the
transaction buffer and underlying database (using
WriteBatch#iteratorWithBase()).

The issue is that when the StreamThread commits, it
writes
the
current
WriteBatch to RocksDB *and then clears the
WriteBatch*.
Clearing
the
WriteBatch while an Interactive Query holds an open
Iterator on
it
will
invalidate the Iterator. Worse, it turns out that
Iterators
over
a
WriteBatch become invalidated not just when the
WriteBatch
is
cleared,
but
also when the Iterators' current key receives a new
write.

Now that I'm writing this, I remember that this is
the
major
reason
that
I
switched the original design from having a
query-time
IsolationLevel to
having the IsolationLevel linked to the
transactionality
of the
stores
themselves.

It *might* be possible to resolve this, by having a
"chain" of
WriteBatches, with the StreamThread switching to a
new
WriteBatch
whenever
a new Interactive Query attempts to read from the
database, but
that
could
cause some performance problems/memory pressure
when
subjected
to
a
high
Interactive Query load. It would also reduce the
efficiency of
WriteBatches
on-commit, as we'd have to write N WriteBatches,
where
N
is the
number of
Interactive Queries since the last commit.

I realise this is getting into the weeds of the
implementation,
and
you'd
rather we focus on the API for now, but I think
it's
important
to
consider
how to implement the desired API, in case we come
up
with
an
API
that
cannot be implemented efficiently, or even at all!

Thoughts?
--
Nick

On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <
cado...@apache.org

wrote:

Hi Nick,

6.
Of course, you are right! My bad!
Wiping out the state in the downgrading case is
fine.


3a.
Focus on the public facing changes for the KIP. We
will
manage
to
get
the internals right. Regarding state stores that
do
not
support
READ_COMMITTED, they should throw an error stating
that
they
do
not
support READ_COMMITTED. No need to adapt all state
stores
immediately.

3b.
I am in favor of using transactions also for ALOS.


Best,
Bruno

On 9/13/23 11:57 AM, Nick Telford wrote:
Hi Bruno,

Thanks for getting back to me!

2.
The fact that implementations can always track
estimated
memory
usage
in
the wrapper is a good point. I can remove -1 as
an
option,
and
I'll
clarify
the JavaDoc that 0 is not just for
non-transactional
stores,
which is
currently misleading.

6.
The problem with catching the exception in the
downgrade
process
is
that
would require new code in the Kafka version being
downgraded
to.
Since
users could conceivably downgrade to almost *any*
older
version
of
Kafka
Streams, I'm not sure how we could add that code?
The only way I can think of doing it would be to
provide
a
dedicated
downgrade tool, that goes through every local
store
and
removes
the
offsets column families. But that seems like an
unnecessary
amount of
extra
code to maintain just to handle a somewhat niche
situation,
when
the
alternative (automatically wipe and restore
stores)
should be
acceptable.

1, 4, 5: Agreed. I'll make the changes you've
requested.

3a.
I agree that IsolationLevel makes more sense at
query-time,
and
I
actually
initially attempted to place the IsolationLevel
at
query-time,
but I
ran
into some problems:
- The key issue is that, under ALOS we're not
staging
writes
in
transactions, so can't perform writes at the
READ_COMMITTED
isolation
level. However, this may be addressed if we
decide to
*always*
use
transactions as discussed under 3b.
- IQv1 and IQv2 have quite different
implementations. I
remember
having
some difficulty understanding the IQv1 internals,
which
made
it
difficult
to determine what needed to be changed. However,
I
*think*
this
can be
addressed for both implementations by wrapping
the
RocksDBStore
in an
IsolationLevel-dependent wrapper, that overrides
read
methods
(get,
etc.)
to either read directly from the database or
from the
ongoing
transaction.
But IQv1 might still be difficult.
- If IsolationLevel becomes a query constraint,
then
all
other
StateStores
will need to respect it, including the in-memory
stores.
This
would
require
us to adapt in-memory stores to stage their
writes so
they
can
be
isolated
from READ_COMMITTTED queries. It would also
become an
important
consideration for third-party stores on upgrade,
as
without
changes,
they
would not support READ_COMMITTED queries
correctly.

Ultimately, I may need some help making the
necessary
change
to
IQv1
to
support this, but I don't think it's
fundamentally
impossible,
if we
want
to pursue this route.

3b.
The main reason I chose to keep ALOS
un-transactional
was to
minimize
behavioural change for most users (I believe most
Streams
users
use
the
default configuration, which is ALOS). That said,
it's
clear
that if
ALOS
also used transactional stores, the only change
in
behaviour
would be
that
it would become *more correct*, which could be
considered a
"bug
fix"
by
users, rather than a change they need to handle.

I believe that performance using transactions
(aka.
RocksDB
WriteBatches)
should actually be *better* than the un-batched
write-path
that
is
currently used[1]. The only "performance"
consideration
will
be
the
increased memory usage that transactions require.
Given
the
mitigations
for
this memory that we have in place, I would expect
that
this
is
not a
problem for most users.

If we're happy to do so, we can make ALOS also
use
transactions.

Regards,
Nick

Link 1:







https://github.com/adamretter/rocksjava-write-methods-benchmark#results

On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <
cado...@apache.org

wrote:

Hi Nick,

Thanks for the updates and sorry for the delay
on my
side!


1.
Making the default implementation for flush() a
no-op
sounds
good to
me.


2.
I think what was bugging me here is that a
third-party
state
store
needs
to implement the state store interface. That
means
they
need
to
implement a wrapper around the actual state
store
as we
do
for
RocksDB
with RocksDBStore. So, a third-party state
store can
always
estimate
the
uncommitted bytes, if it wants, because the
wrapper
can
record
the
added
bytes.
One case I can think of where returning -1 makes
sense
is
when
Streams
does not need to estimate the size of the write
batch
and
trigger
extraordinary commits, because the third-party
state
store
takes care
of
memory. But in that case the method could also
just
return
0.
Even
that
case would be better solved with a method that
returns
whether
the
state
store manages itself the memory used for
uncommitted
bytes
or
not.
Said that, I am fine with keeping the -1 return
value,
I was
just
wondering when and if it will be used.

Regarding returning 0 for transactional state
stores
when
the
batch
is
empty, I was just wondering because you
explicitly
stated

"or {@code 0} if this StateStore does not
support
transactions."

So it seemed to me returning 0 could only
happen for
non-transactional
state stores.


3.

a) What do you think if we move the isolation
level
to
IQ
(v1
and
v2)?
In the end this is the only component that
really
needs
to
specify
the
isolation level. It is similar to the Kafka
consumer
that
can
choose
with what isolation level to read the input
topic.
For IQv1 the isolation level should go into
StoreQueryParameters. For
IQv2, I would add it to the Query interface.

b) Point a) raises the question what should
happen
during
at-least-once
processing when the state store does not use
transactions?
John
in
the
past proposed to also use transactions on state
stores
for
at-least-once. I like that idea, because it
avoids
aggregating
the
same
records over and over again in the case of a
failure. We
had a
case
in
the past where a Streams applications in
at-least-once
mode
was
failing
continuously for some reasons I do not remember
before
committing the
offsets. After each failover, the app aggregated
again
and
again the
same records. Of course the aggregate increased
to
very
wrong
values
just because of the failover. With transactions
on
the
state
stores
we
could have avoided this. The app would have
output
the
same
aggregate
multiple times (i.e., after each failover) but
at
least
the
value of
the
aggregate would not depend on the number of
failovers.
Outputting the
same aggregate multiple times would be incorrect
under
exactly-once
but
it is OK for at-least-once.
If it makes sense to add a config to turn on
and off
transactions on
state stores under at-least-once or just use
transactions in
any case
is
a question we should also discuss in this KIP.
It
depends a
bit
on
the
performance trade-off. Maybe to be safe, I would
add a
config.


4.
Your points are all valid. I tend to say to
keep the
metrics
around
flush() until we remove flush() completely from
the
interface.
Calls
to
flush() might still exist since existing
processors
might
still
call
flush() explicitly as you mentioned in 1). For
sure, we
need
to
document
how the metrics change due to the transactions
in
the
upgrade
notes.


5.
I see. Then you should describe how the
.position
files
are
handled
in
a dedicated section of the KIP or incorporate
the
description
in the
"Atomic Checkpointing" section instead of only
mentioning it
in
the
"Compatibility, Deprecation, and Migration
Plan".


6.
Describing upgrading and downgrading in the KIP
is a
good
idea.
Regarding downgrading, I think you could also
catch
the
exception and
do
what is needed to downgrade, e.g., drop the
column
family.
See
here
for
an example:












https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75

It is a bit brittle, but it works.


Best,
Bruno


On 8/24/23 12:18 PM, Nick Telford wrote:
Hi Bruno,

Thanks for taking the time to review the KIP.
I'm
back
from
leave
now
and
intend to move this forwards as quickly as I
can.

Addressing your points:

1.
Because flush() is part of the StateStore API,
it's
exposed
to
custom
Processors, which might be making calls to
flush().
This
was
actually
the
case in a few integration tests.
To maintain as much compatibility as possible,
I'd
prefer
not
to
make
this
an UnsupportedOperationException, as it will
cause
previously
working
Processors to start throwing exceptions at
runtime.
I agree that it doesn't make sense for it to
proxy
commit(),
though,
as
that would cause it to violate the "StateStores
commit
only
when the
Task
commits" rule.
Instead, I think we should make this a no-op.
That
way,
existing
user
Processors will continue to work as-before,
without
violation
of
store
consistency that would be caused by premature
flush/commit
of
StateStore
data to disk.
What do you think?

2.
As stated in the JavaDoc, when a StateStore
implementation
is
transactional, but is unable to estimate the
uncommitted
memory
usage,
the
method will return -1.
The intention here is to permit third-party
implementations
that may
not
be
able to estimate memory usage.

Yes, it will be 0 when nothing has been
written to
the
store
yet. I
thought
that was implied by "This method will return an
approximation
of the
memory
would be freed by the next call to {@link
#commit(Map)}"
and
"@return
The
approximate size of all records awaiting {@link
#commit(Map)}",
however,
I
can add it explicitly to the JavaDoc if you
think
this
is
unclear?

3.
I realise this is probably the most contentious
point
in my
design,
and
I'm
open to changing it if I'm unable to convince
you
of
the
benefits.
Nevertheless, here's my argument:
The Interactive Query (IQ) API(s) are directly
provided
StateStores
to
query, and it may be important for users to
programmatically
know
which
mode the StateStore is operating under. If we
simply
provide
an
"eosEnabled" boolean (as used throughout the
internal
streams
engine), or
similar, then users will need to understand the
operation
and
consequences
of each available processing mode and how it
pertains
to
their
StateStore.

Interactive Query users aren't the only people
that
care
about
the
processing.mode/IsolationLevel of a StateStore:
implementers
of
custom
StateStores also need to understand the
behaviour
expected
of
their
implementation. KIP-892 introduces some
assumptions
into
the
Streams
Engine
about how StateStores operate under each
processing
mode,
and
it's
important that custom implementations adhere to
those
assumptions in
order
to maintain the consistency guarantees.

IsolationLevels provide a high-level contract
on
the
behaviour
of
the
StateStore: a user knows that under
READ_COMMITTED,
they
will
see
writes
only after the Task has committed, and under
READ_UNCOMMITTED
they
will
see
writes immediately. No understanding of the
details of
each
processing.mode
is required, either for IQ users or StateStore
implementers.

An argument can be made that these contractual
guarantees
can
simply
be
documented for the processing.mode (i.e. that
exactly-once
and
exactly-once-v2 behave like READ_COMMITTED and
at-least-once
behaves
like
READ_UNCOMMITTED), but there are several small
issues
with
this I'd
prefer
to avoid:

         - Where would we document these
contracts,
in
a way
that
is
difficult
         for users/implementers to miss/ignore?
         - It's not clear to users that the
processing
mode
is
communicating
         an expectation of read isolation,
unless
they
read
the
documentation. Users
         rarely consult documentation unless
they
feel
they
need
to, so
it's
likely
         this detail would get missed by many
users.
         - It tightly couples processing modes
to
read
isolation.
Adding
new
         processing modes, or changing the read
isolation of
existing
processing
         modes would be difficult/impossible.

Ultimately, the cost of introducing
IsolationLevels is
just a
single
method, since we re-use the existing
IsolationLevel
enum
from
Kafka.
This
gives us a clear place to document the
contractual
guarantees
expected
of/provided by StateStores, that is accessible
both by
the
StateStore
itself, and by IQ users.

(Writing this I've just realised that the
StateStore
and IQ
APIs
actually
don't provide access to StateStoreContext that
IQ
users
would
have
direct
access to... Perhaps StateStore should expose
isolationLevel()
itself
too?)

4.
Yeah, I'm not comfortable renaming the metrics
in-place
either, as
it's a
backwards incompatible change. My concern is
that,
if
we
leave
the
existing
"flush" metrics in place, they will be
confusing to
users.
Right
now,
"flush" metrics record explicit flushes to
disk,
but
under
KIP-892,
even
a
commit() will not explicitly flush data to
disk -
RocksDB
will
decide
on
when to flush memtables to disk itself.

If we keep the existing "flush" metrics, we'd
have
two
options,
which
both
seem pretty bad to me:

         1. Have them record calls to commit(),
which
would
be
misleading, as
         data is no longer explicitly "flushed"
to
disk
by
this
call.
         2. Have them record nothing at all,
which
is
equivalent
to
removing
the
         metrics, except that users will see the
metric
still
exists and
so
assume
         that the metric is correct, and that
there's a
problem
with
their
system
         when there isn't.

I agree that removing them is also a bad
solution,
and
I'd
like some
guidance on the best path forward here.

5.
Position files are updated on every write to a
StateStore.
Since our
writes
are now buffered until commit(), we can't
update
the
Position
file
until
commit() has been called, otherwise it would be
inconsistent
with
the
data
in the event of a rollback. Consequently, we
need
to
manage
these
offsets
the same way we manage the checkpoint offsets,
and
ensure
they're
only
written on commit().

6.
Agreed, although I'm not exactly sure yet what
tests to
write.
How
explicit
do we need to be here in the KIP?

As for upgrade/downgrade: upgrade is designed
to be
seamless,
and we
should
definitely add some tests around that.
Downgrade,
it
transpires,
isn't
currently possible, as the extra column family
for
offset
storage is
incompatible with the pre-KIP-892
implementation:
when
you
open a
RocksDB
database, you must open all available column
families
or
receive an
error.
What currently happens on downgrade is that it
attempts to
open the
store,
throws an error about the offsets column
family not
being
opened,
which
triggers a wipe and rebuild of the Task. Given
that
downgrades
should
be
uncommon, I think this is acceptable
behaviour, as
the
end-state is
consistent, even if it results in an
undesirable
state
restore.

Should I document the upgrade/downgrade
behaviour
explicitly
in the
KIP?

--

Regards,
Nick


On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Nick!

Thanks for the updates!

1.
Why does StateStore#flush() default to
StateStore#commit(Collections.emptyMap())?
Since calls to flush() will not exist anymore
after
this
KIP
is
released, I would rather throw an unsupported
operation
exception
by
default.


2.
When would a state store return -1 from
StateStore#approximateNumUncommittedBytes()
while
being
transactional?

Wouldn't
StateStore#approximateNumUncommittedBytes()
also
return 0
if
the state store is transactional but nothing
has
been
written
to
the
state store yet?


3.
Sorry for bringing this up again. Does this
KIP
really
need
to
introduce
StateStoreContext#isolationLevel()?
StateStoreContext
has
already
appConfigs() which basically exposes the same
information,
i.e., if
EOS
is enabled or not.
In one of your previous e-mails you wrote:

"My idea was to try to keep the StateStore
interface
as
loosely
coupled
from the Streams engine as possible, to give
implementers
more
freedom,
and reduce the amount of internal knowledge
required."

While I understand the intent, I doubt that it
decreases
the
coupling of
a StateStore interface and the Streams engine.
READ_COMMITTED
only
applies to IQ but not to reads by processors.
Thus,
implementers
need to
understand how Streams accesses the state
stores.

I would like to hear what others think about
this.


4.
Great exposing new metrics for transactional
state
stores!
However, I
would prefer to add new metrics and deprecate
(in
the
docs)
the old
ones. You can find examples of deprecated
metrics
here:

https://kafka.apache.org/documentation/#selector_monitoring


5.
Why does the KIP mention position files? I do
not
think
they
are
related
to transactions or flushes.


6.
I think we will also need to adapt/add
integration
tests
besides
unit
tests. Additionally, we probably need
integration
or
system
tests
to
verify that upgrades and downgrades between
transactional
and
non-transactional state stores work as
expected.


Best,
Bruno





On 7/21/23 10:34 PM, Nick Telford wrote:
One more thing: I noted John's suggestion in
the
KIP,
under
"Rejected
Alternatives". I still think it's an idea
worth
pursuing,
but I
believe
that it's out of the scope of this KIP,
because
it
solves a
different
set
of problems to this KIP, and the scope of
this
one
has
already
grown
quite
large!

On Fri, 21 Jul 2023 at 21:33, Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi everyone,

I've updated the KIP (












https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
)
with the latest changes; mostly bringing
back
"Atomic
Checkpointing"
(for
what feels like the 10th time!). I think
the one
thing
missing is
some
changes to metrics (notably the store
"flush"
metrics
will
need
to
be
renamed to "commit").

The reason I brought back Atomic
Checkpointing
was
to
decouple
store
flush
from store commit. This is important,
because
with
Transactional
StateStores, we now need to call "flush" on
*every*
Task
commit,
and
not
just when the StateStore is closing,
otherwise
our
transaction
buffer
will
never be written and persisted, instead
growing
unbounded!
I
experimented
with some simple solutions, like forcing a
store
flush
whenever
the
transaction buffer was likely to exceed its
configured
size, but
this
was
brittle: it prevented the transaction buffer
from
being
configured
to
be
unbounded, and it still would have required
explicit
flushes of
RocksDB,
yielding sub-optimal performance and memory
utilization.

I deemed Atomic Checkpointing to be the
"right"
way
to
resolve
this
problem. By ensuring that the changelog
offsets
that
correspond
to
the
most
recently written records are always
atomically
written
to
the
StateStore
(by writing them to the same transaction
buffer),
we can
avoid
forcibly
flushing the RocksDB memtables to disk,
letting
RocksDB
flush
them
only
when necessary, without losing any of our
consistency
guarantees.
See
the
updated KIP for more info.

I have fully implemented these changes,
although I'm
still
not
entirely
happy with the implementation for segmented
StateStores,
so
I
plan
to
refactor that. Despite that, all tests
pass. If
you'd
like
to try
out
or
review this highly experimental and
incomplete
branch,
it's
available
here:

https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0
.
Note:
it's
built
against Kafka 3.5.0 so that I had a stable
base
to
build
and test
it
on,
and to enable easy apples-to-apples
comparisons
in a
live
environment. I
plan to rebase it against trunk once it's
nearer
completion
and
has
been
proven on our main application.

I would really appreciate help in reviewing
and
testing:
- Segmented (Versioned, Session and Window)
stores
- Global stores

As I do not currently use either of these,
so my
primary
test
environment
doesn't test these areas.

I'm going on Parental Leave starting next
week
for
a few
weeks,
so
will
not have time to move this forward until
late
August.
That
said,
your
feedback is welcome and appreciated, I just
won't be
able
to
respond
as
quickly as usual.

Regards,
Nick

On Mon, 3 Jul 2023 at 16:23, Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Bruno

Yes, that's correct, although the impact
on IQ
is
not
something
I
had
considered.

What about atomically updating the state
store
from the
transaction
buffer every commit interval and writing
the
checkpoint
(thus,
flushing
the memtable) every configured amount of
data
and/or
number of
commit
intervals?


I'm not quite sure I follow. Are you
suggesting
that we
add an
additional
config for the max number of commit
intervals
between
checkpoints?
That
way, we would checkpoint *either* when the
transaction
buffers
are
nearly
full, *OR* whenever a certain number of
commit
intervals
have
elapsed,
whichever comes first?

That certainly seems reasonable, although
this
re-ignites
an
earlier
debate about whether a config should be
measured in
"number of
commit
intervals", instead of just an absolute
time.

FWIW, I realised that this issue is the
reason
I
was
pursuing
the
Atomic
Checkpoints, as it de-couples memtable
flush
from
checkpointing,
which
enables us to just checkpoint on every
commit
without
any
performance
impact. Atomic Checkpointing is definitely
the
"best"
solution,
but
I'm not
sure if this is enough to bring it back
into
this
KIP.

I'm currently working on moving all the
transactional
logic
directly
into
RocksDBStore itself, which does away with
the
StateStore#newTransaction
method, and reduces the number of new
classes
introduced,
significantly
reducing the complexity. If it works, and
the
complexity
is
drastically
reduced, I may try bringing back Atomic
Checkpoints
into
this
KIP.

Regards,
Nick

On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna
<
cado...@apache.org>
wrote:

Hi Nick,

Thanks for the insights! Very interesting!

As far as I understand, you want to
atomically
update
the
state
store
from the transaction buffer, flush the
memtable
of a
state
store
and
write the checkpoint not after the commit
time
elapsed
but
after
the
transaction buffer reached a size that
would
lead
to
exceeding
statestore.transaction.buffer.max.bytes
before the
next
commit
interval
ends.
That means, the Kafka transaction would
commit
every
commit
interval
but
the state store will only be atomically
updated
roughly
every
statestore.transaction.buffer.max.bytes of
data.
Also
IQ
would
then
only
see new data roughly every
statestore.transaction.buffer.max.bytes.
After a failure the state store needs to
restore
up to
statestore.transaction.buffer.max.bytes.

Is this correct?

What about atomically updating the state
store
from
the
transaction
buffer every commit interval and writing
the
checkpoint
(thus,
flushing
the memtable) every configured amount of
data
and/or
number of
commit
intervals? In such a way, we would have
the
same
delay
for
records
appearing in output topics and IQ because
both
would
appear
when
the
Kafka transaction is committed. However,
after a
failure
the
state
store
still needs to restore up to
statestore.transaction.buffer.max.bytes
and
it might restore data that is already in
the
state
store
because
the
checkpoint lags behind the last stable
offset
(i.e.
the
last
committed
offset) of the changelog topics. Restoring
data
that
is
already
in
the
state store is idempotent, so eos should
not
violated.
This solution needs at least one new
config to
specify
when a
checkpoint
should be written.



A small correction to your previous e-mail
that
does
not
change
anything
you said: Under alos the default commit
interval
is 30
seconds,
not
five
seconds.


Best,
Bruno


On 01.07.23 12:37, Nick Telford wrote:
Hi everyone,

I've begun performance testing my branch
on
our
staging
environment,
putting it through its paces in our
non-trivial
application.
I'm
already
observing the same increased flush rate
that
we
saw
the
last
time
we
attempted to use a version of this KIP,
but
this
time,
I
think I
know
why.

Pre-KIP-892, StreamTask#postCommit,
which is
called
at
the end
of
the
Task
commit process, has the following
behaviour:

           - Under ALOS: checkpoint the
state
stores.
This
includes
           flushing memtables in RocksDB.
This is
acceptable
because the
default
           commit.interval.ms is 5
seconds,
so
forcibly
flushing
memtables
every 5
           seconds is acceptable for most
applications.
           - Under EOS: checkpointing is
not
done,
*unless*
it's
being
forced, due
           to e.g. the Task closing or
being
revoked.
This
means
that
under
normal
           processing conditions, the
state
stores
will
not
be
checkpointed,
and will
           not have memtables flushed at
all ,
unless
RocksDB
decides to
flush them on
           its own. Checkpointing stores
and
force-flushing
their
memtables
is only
           done when a Task is being
closed.

Under EOS, KIP-892 needs to checkpoint
stores on
at
least
*some*
normal
Task commits, in order to write the
RocksDB
transaction
buffers
to
the
state stores, and to ensure the offsets
are
synced to
disk to
prevent
restores from getting out of hand.
Consequently,
my
current
implementation
calls maybeCheckpoint on *every* Task
commit,
which
is
far too
frequent.
This causes checkpoints every 10,000
records,
which
is
a
change
in
flush
behaviour, potentially causing
performance
problems
for
some
applications.

I'm looking into possible solutions, and
I'm
currently
leaning
towards
using the
statestore.transaction.buffer.max.bytes
configuration
to
checkpoint Tasks once we are likely to
exceed it.
This
would
complement the
existing "early Task commit"
functionality
that
this
configuration
provides, in the following way:

           - Currently, we use
statestore.transaction.buffer.max.bytes
to
force an
           early Task commit if processing
more
records
would
cause
our
state
store
           transactions to exceed the
memory
assigned
to
them.
           - New functionality: when a
Task
*does*
commit,
we will
not
checkpoint
           the stores (and hence flush the
transaction
buffers)
unless
we
expect to
           cross the
statestore.transaction.buffer.max.bytes
threshold
before
the next
           commit

I'm also open to suggestions.

Regards,
Nick

On Thu, 22 Jun 2023 at 14:06, Nick
Telford <
nick.telf...@gmail.com

wrote:

Hi Bruno!

3.
By "less predictable for users", I
meant in
terms of
understanding
the
performance profile under various
circumstances. The
more
complex
the
solution, the more difficult it would
be for
users
to
understand
the
performance they see. For example,
spilling
records
to
disk
when
the
transaction buffer reaches a threshold
would, I
expect,
reduce
write
throughput. This reduction in write
throughput
could
be
unexpected,
and
potentially difficult to
diagnose/understand for
users.
At the moment, I think the "early
commit"
concept is
relatively
straightforward; it's easy to document,
and
conceptually
fairly
obvious to
users. We could probably add a metric to
make it
easier
to
understand
when
it happens though.

3. (the second one)
The IsolationLevel is *essentially* an
indirect
way
of
telling
StateStores
whether they should be transactional.
READ_COMMITTED
essentially
requires
transactions, because it dictates that
two
threads
calling
`newTransaction()` should not see writes
from
the
other
transaction
until
they have been committed. With
READ_UNCOMMITTED, all
bets are
off,
and
stores can allow threads to observe
written
records
at
any
time,
which is
essentially "no transactions". That
said,
StateStores
are
free
to
implement
these guarantees however they can,
which is
a
bit
more
relaxed
than
dictating "you must use transactions".
For
example,
with
RocksDB
we
would
implement these as READ_COMMITTED ==
WBWI-based
"transactions",
READ_UNCOMMITTED == direct writes to the
database.
But
with
other
storage
engines, it might be preferable to
*always*
use
transactions,
even
when
unnecessary; or there may be storage
engines
that
don't
provide
transactions, but the isolation
guarantees
can
be
met
using a
different
technique.
My idea was to try to keep the
StateStore
interface
as
loosely
coupled
from the Streams engine as possible, to
give
implementers
more
freedom, and
reduce the amount of internal knowledge
required.
That said, I understand that
"IsolationLevel"
might
not
be
the
right
abstraction, and we can always make it
much
more
explicit if
required, e.g.
boolean transactional()

7-8.
I can make these changes either later
today
or
tomorrow.

Small update:
I've rebased my branch on trunk and
fixed a
bunch of
issues
that
needed
addressing. Currently, all the tests
pass,
which is
promising,
but
it
will
need to undergo some performance
testing. I
haven't
(yet)
worked
on
removing the `newTransaction()` stuff,
but I
would
expect
that,
behaviourally, it should make no
difference. The
branch
is
available
at

https://github.com/nicktelford/kafka/tree/KIP-892-c
if
anyone
is
interested in taking an early look.

Regards,
Nick

On Thu, 22 Jun 2023 at 11:59, Bruno
Cadonna
<
cado...@apache.org>
wrote:

Hi Nick,

1.
Yeah, I agree with you. That was
actually
also
my
point. I
understood
that John was proposing the ingestion
path
as
a way
to
avoid
the
early
commits. Probably, I misinterpreted the
intent.

2.
I agree with John here, that actually
it is
public
API. My
question
is
how this usage pattern affects normal
processing.

3.
My concern is that checking for the
size
of the
transaction
buffer
and
maybe triggering an early commit
affects
the
whole
processing
of
Kafka
Streams. The transactionality of a
state
store
is
not
confined to
the
state store itself, but spills over and
changes the
behavior
of
other
parts of the system. I agree with you
that
it
is a
decent
compromise. I
just wanted to analyse the downsides
and
list
the
options to
overcome
them. I also agree with you that all
options
seem
quite
heavy
compared
with your KIP. I do not understand
what you
mean
with
"less
predictable
for users", though.


I found the discussions about the
alternatives
really
interesting.
But I
also think that your plan sounds good
and
we
should
continue
with
it!


Some comments on your reply to my
e-mail on
June
20th:

3.
Ah, now, I understand the reasoning
behind
putting
isolation
level
in
the state store context. Thanks! Should
that
also
be
a
way
to
give
the
the state store the opportunity to
decide
whether
to
turn on
transactions or not?
With my comment, I was more concerned
about
how do
you
know
if a
checkpoint file needs to be written
under
EOS,
if
you
do not
have a
way
to know if the state store is
transactional or
not.
If
a
state
store
is
transactional, the checkpoint file can
be
written
during
normal
processing under EOS. If the state
store
is not
transactional,
the
checkpoint file must not be written
under
EOS.

7.
My point was about not only
considering the
bytes
in
memory
in
config
statestore.uncommitted.max.bytes, but
also
bytes
that
might
be
spilled
on disk. Basically, I was wondering
whether you
should
remove
the
"memory" in "Maximum number of memory
bytes to
be
used
to
buffer uncommitted state-store
records." My
thinking
was
that
even
if a
state store spills uncommitted bytes to
disk,
limiting
the
overall
bytes
might make sense. Thinking about it
again
and
considering
the
recent
discussions, it does not make too much
sense
anymore.
I like the name
statestore.transaction.buffer.max.bytes that
you
proposed.

8.
A high-level description (without
implementation
details) of
how
Kafka
Streams will manage the commit of
changelog
transactions,
state
store
transactions and checkpointing would be
great.
Would
be
great
if
you
could also add some sentences about the
behavior in
case of
a
failure.
For instance how does a transactional
state
store
recover
after a
failure or what happens with the
transaction
buffer,
etc.
(that
is
what
I meant by "fail-over" in point 9.)

Best,
Bruno

On 21.06.23 18:50, Nick Telford wrote:
Hi Bruno,

1.
Isn't this exactly the same issue that
WriteBatchWithIndex
transactions
have, whereby exceeding (or likely to
exceed)
configured
memory
needs to
trigger an early commit?

2.
This is one of my big concerns.
Ultimately,
any
approach
based
on
cracking
open RocksDB internals and using it in
ways
it's
not
really
designed
for is
likely to have some unforseen
performance
or
consistency
issues.

3.
What's your motivation for removing
these
early
commits?
While
not
ideal, I
think they're a decent compromise to
ensure
consistency
whilst
maintaining
good and predictable performance.
All 3 of your suggested ideas seem
*very*
complicated, and
might
actually
make behaviour less predictable for
users
as a
consequence.

I'm a bit concerned that the scope of
this
KIP is
growing a
bit
out
of
control. While it's good to discuss
ideas
for
future
improvements, I
think

it's important to narrow the scope
down
to a
design
that
achieves
the
most
pressing objectives (constant sized
restorations
during
dirty
close/unexpected errors). Any design
that
this KIP
produces
can
ultimately
be changed in the future, especially
if
the
bulk
of
it is
internal
behaviour.

I'm going to spend some time next week
trying
to
re-work
the
original
WriteBatchWithIndex design to remove
the
newTransaction()
method,
such
that
it's just an implementation detail of
RocksDBStore.
That
way, if
we
want to
replace WBWI with something in the
future,
like
the
SST
file
management
outlined by John, then we can do so
with
little/no
API
changes.

Regards,

Nick




























Reply via email to