Hi Nick,

1.
Yeah, you are probably right that it does not make too much sense. Thanks for the clarification!


4.
Yes, sorry for the back and forth, but I think for the sake of the KIP it is better to let the ALOS behavior as it is for now due to the possible issues you would run into. Maybe we can find a solution in the future. Now the question returns to whether we really need default.state.isolation.level. Maybe the config could be the feature flag Sophie requested.


5.
There is a guideline in Kafka not to use the get prefix for getters (at least in the public API). Thus, could you please rename

getCommittedOffset(TopicPartition partition) -> committedOffsetFor(TopicPartition partition)

You can also propose an alternative to committedOffsetFor().


Best,
Bruno


On 10/13/23 3:21 PM, Nick Telford wrote:
Hi Bruno,

Thanks for getting back to me.

1.
I think this should be possible. Are you thinking of the situation where a
user may downgrade to a previous version of Kafka Streams? In that case,
sadly, the RocksDBStore would get wiped by the older version of Kafka
Streams anyway, because that version wouldn't understand the extra column
family (that holds offsets), so the missing Position file would
automatically get rebuilt when the store is rebuilt from the changelog.
Are there other situations than downgrade where a transactional store could
be replaced by a non-transactional one? I can't think of any.

2.
Ahh yes, the Test Plan - my Kryptonite! This section definitely needs to be
fleshed out. I'll work on that. How much detail do you need?

3.
See my previous email discussing this.

4.
Hmm, this is an interesting point. Are you suggesting that under ALOS
READ_COMMITTED should not be supported?

Regards,
Nick

On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna <cado...@apache.org> wrote:

Hi Nick,

I think the KIP is converging!


1.
I am wondering whether it makes sense to write the position file during
close as we do for the checkpoint file, so that in case the state store
is replaced with a non-transactional state store the non-transactional
state store finds the position file. I think, this is not strictly
needed, but would be a nice behavior instead of just deleting the
position file.


2.
The test plan does not mention integration tests. Do you not need to
extend existing ones and add new ones. Also for upgrading and
downgrading you might need integration and/or system tests.


3.
I think Sophie made a point. Although, IQ reading from uncommitted data
under EOS might be considered a bug by some people. Thus, your KIP would
fix a bug rather than changing the intended behavior. However, I also
see that a feature flag would help users that rely on this buggy
behavior (at least until AK 4.0).


4.
This is related to the previous point. I assume that the difference
between READ_COMMITTED and READ_UNCOMMITTED for ALOS is that in the
former you enable transactions on the state store and in the latter you
disable them. If my assumption is correct, I think that is an issue.
Let's assume under ALOS Streams fails over a couple of times more or
less at the same step in processing after value 3 is added to an
aggregation but the offset of the corresponding input record was not
committed. Without transactions disabled, the aggregation value would
increase by 3 for each failover. With transactions enabled, value 3
would only be added to the aggregation once when the offset of the input
record is committed and the transaction finally completes. So the
content of the state store would change depending on the configuration
for IQ. IMO, the content of the state store should be independent from
IQ. Given this issue, I propose to not use transactions with ALOS at
all. I was a big proponent of using transactions with ALOS, but I
realized that transactions with ALOS is not as easy as enabling
transactions on state stores. Another aspect that is problematic is that
the changelog topic which actually replicates the state store is not
transactional under ALOS. Thus, it might happen that the state store and
the changelog differ in their content. All of this is maybe solvable
somehow, but for the sake of this KIP, I would leave it for the future.


Best,
Bruno



On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote:
Hey Nick! First of all thanks for taking up this awesome feature, I'm
sure
every single
Kafka Streams user and dev would agree that it is sorely needed.

I've just been catching up on the KIP and surrounding discussion, so
please
forgive me
for any misunderstandings or misinterpretations of the current plan and
don't hesitate to
correct me.

Before I jump in, I just want to say that having seen this drag on for so
long, my singular
goal in responding is to help this KIP past a perceived impasse so we can
finally move on
to voting and implementing it. Long discussions are to be expected for
major features like
this but it's completely on us as the Streams devs to make sure there is
an
end in sight
for any ongoing discussion.

With that said, it's my understanding that the KIP as currently proposed
is
just not tenable
for Kafka Streams, and would prevent some EOS users from upgrading to the
version it
first appears in. Given that we can't predict or guarantee whether any of
the followup KIPs
would be completed in the same release cycle as this one, we need to make
sure that the
feature is either compatible with all current users or else
feature-flagged
so that they may
opt in/out.

Therefore, IIUC we need to have either (or both) of these as
fully-implemented config options:
1. default.state.isolation.level
2. enable.transactional.state.stores

This way EOS users for whom read_committed semantics are not viable can
still upgrade,
and either use the isolation.level config to leverage the new txn state
stores without sacrificing
their application semantics, or else simply keep the transactional state
stores disabled until we
are able to fully implement the isolation level configuration at either
an
application or query level.

Frankly you are the expert here and know much more about the tradeoffs in
both semantics and
effort level of implementing one of these configs vs the other. In my
opinion, either option would
be fine and I would leave the decision of which one to include in this
KIP
completely up to you.
I just don't see a way for the KIP to proceed without some variation of
the
above that would allow
EOS users to opt-out of read_committed.

(If it's all the same to you, I would recommend always including a
feature
flag in large structural
changes like this. No matter how much I trust someone or myself to
implement a feature, you just
never know what kind of bugs might slip in, especially with the very
first
iteration that gets released.
So personally, my choice would be to add the feature flag and leave it
off
by default. If all goes well
you can do a quick KIP to enable it by default as soon as the
isolation.level config has been
completed. But feel free to just pick whichever option is easiest or
quickest for you to implement)

Hope this helps move the discussion forward,
Sophie

On Tue, Sep 19, 2023 at 1:57 AM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi Bruno,

Agreed, I can live with that for now.

In an effort to keep the scope of this KIP from expanding, I'm leaning
towards just providing a configurable default.state.isolation.level and
removing IsolationLevel from the StateStoreContext. This would be
compatible with adding support for query-time IsolationLevels in the
future, whilst providing a way for users to select an isolation level
now.

The big problem with this, however, is that if a user selects
processing.mode
= "exactly-once(-v2|-beta)", and default.state.isolation.level =
"READ_UNCOMMITTED", we need to guarantee that the data isn't written to
disk until commit() is called, but we also need to permit IQ threads to
read from the ongoing transaction.

A simple solution would be to (temporarily) forbid this combination of
configuration, and have default.state.isolation.level automatically
switch
to READ_COMMITTED when processing.mode is anything other than
at-least-once. Do you think this would be acceptable?

In a later KIP, we can add support for query-time isolation levels and
solve this particular problem there, which would relax this restriction.

Regards,
Nick

On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <cado...@apache.org>
wrote:

Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I think
it is perfectly valid to say InMemoryKeyValueStore do not support
READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto default
at the moment.

Best,
Bruno

On 9/18/23 7:12 PM, Nick Telford wrote:
Oh! One other concern I haven't mentioned: if we make IsolationLevel a
query-time constraint, then we need to add support for READ_COMMITTED
to
InMemoryKeyValueStore too, which will require some changes to the
implementation.

On Mon, 18 Sept 2023 at 17:24, Nick Telford <nick.telf...@gmail.com>
wrote:

Hi everyone,

I agree that having IsolationLevel be determined at query-time is the
ideal design, but there are a few sticking points:

1.
There needs to be some way to communicate the IsolationLevel down to
the
RocksDBStore itself, so that the query can respect it. Since stores
are
"layered" in functionality (i.e. ChangeLoggingStore, MeteredStore,
etc.),
we need some way to deliver that information to the bottom layer. For
IQv2,
we can use the existing State#query() method, but IQv1 has no way to
do
this.

A simple approach, which would potentially open up other options,
would
be
to add something like: ReadOnlyKeyValueStore<K, V>
readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore
(and
similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.).

2.
As mentioned above, RocksDB WriteBatches are not thread-safe, which
causes
a problem if we want to provide READ_UNCOMMITTED Iterators. I also
had a
look at RocksDB Transactions[1], but they solve a very different
problem,
and have the same thread-safety issue.

One possible approach that I mentioned is chaining WriteBatches:
every
time a new Interactive Query is received (i.e. readOnlyView, see
above,
is called) we "freeze" the existing WriteBatch, and start a new one
for
new
writes. The Interactive Query queries the "chain" of previous
WriteBatches
+ the underlying database; while the StreamThread starts writing to
the
*new* WriteBatch. On-commit, the StreamThread would write *all*
WriteBatches in the chain to the database (that have not yet been
written).

WriteBatches would be closed/freed only when they have been both
committed, and all open Interactive Queries on them have been closed.
This
would require some reference counting.

Obviously a drawback of this approach is the potential for increased
memory usage: if an Interactive Query is long-lived, for example by
doing a
full scan over a large database, or even just pausing in the middle
of
an
iteration, then the existing chain of WriteBatches could be kept
around
for
a long time, potentially forever.

--

A.
Going off on a tangent, it looks like in addition to supporting
READ_COMMITTED queries, we could go further and support
REPEATABLE_READ
queries (i.e. where subsequent reads to the same key in the same
Interactive Query are guaranteed to yield the same value) by making
use
of
RocksDB Snapshots[2]. These are fairly lightweight, so the
performance
impact is likely to be negligible, but they do require that the
Interactive
Query session can be explicitly closed.

This could be achieved if we made the above readOnlyView interface
look
more like:

interface ReadOnlyKeyValueView<K, V> implements
ReadOnlyKeyValueStore<K,
V>, AutoCloseable {}

interface ReadOnlyKeyValueStore<K, V> {
       ...
       ReadOnlyKeyValueView<K, V> readOnlyView(IsolationLevel
isolationLevel);
}

But this would be a breaking change, as existing IQv1 queries are
guaranteed to never call store.close(), and therefore these would
leak
memory under REPEATABLE_READ.

B.
One thing that's notable: MyRocks states that they support
READ_COMMITTED
and REPEATABLE_READ, but they make no mention of
READ_UNCOMMITTED[3][4].
This could be because doing so is technically difficult/impossible
using
the primitives available in RocksDB.

--

Lucas, to address your points:

U1.
It's only "SHOULD" to permit alternative (i.e. non-RocksDB)
implementations of StateStore that do not support atomic writes.
Obviously
in those cases, the guarantees Kafka Streams provides/expects would
be
relaxed. Do you think we should require all implementations to
support
atomic writes?

U2.
Stores can support multiple IsolationLevels. As we've discussed
above,
the
ideal scenario would be to specify the IsolationLevel at query-time.
Failing that, I think the second-best approach is to define the
IsolationLevel for *all* queries based on the processing.mode, which
is
what the default StateStoreContext#isolationLevel() achieves. Would
you
prefer an alternative?

While the existing implementation is equivalent to READ_UNCOMMITTED,
this
can yield unexpected results/errors under EOS, if a transaction is
rolled
back. While this would be a change in behaviour for users, it would
look
more like a bug fix than a breaking change. That said, we *could*
make
it
configurable, and default to the existing behaviour
(READ_UNCOMMITTED)
instead of inferring it from the processing.mode?

N1, N2.
These were only primitives to avoid boxing costs, but since this is
not
a
performance sensitive area, it should be fine to change if that's
desirable.

N3.
It's because the store "manages its own offsets", which includes both
committing the offset, *and providing it* via getCommittedOffset().
Personally, I think "managesOffsets" conveys this best, but I don't
mind
changing it if the nomenclature is unclear.

Sorry for the massive emails/essays!
--
Nick

1: https://github.com/facebook/rocksdb/wiki/Transactions
2: https://github.com/facebook/rocksdb/wiki/Snapshot
3: https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
4: https://mariadb.com/kb/en/myrocks-transactional-isolation/

On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

Hi Nick,

since I last read it in April, the KIP has become much cleaner and
easier to read. Great work!

It feels to me the last big open point is whether we can implement
isolation level as a query parameter. I understand that there are
implementation concerns, but as Colt says, it would be a great
addition, and would also simplify the migration path for this
change.
Is the implementation problem you mentioned caused by the WriteBatch
not having a notion of a snapshot, as the underlying DB iterator
does?
In that case, I am not sure a chain of WriteBatches as you propose
would fully solve the problem, but maybe I didn't dig enough into
the
details to fully understand it.

If it's not possible to implement it now, would it be an option to
make sure in this KIP that we do not fully close the door on
per-query
isolation levels in the interface, as it may be possible to
implement
the missing primitives in RocksDB or Speedb in the future.

Understanding:

* U1) Why is it only "SHOULD" for changelogOffsets to be persisted
atomically with the records?
* U2) Don't understand the default implementation of
`isolationLevel`.
The isolation level should be a property of the underlying store,
and
not be defined by the default config? Existing stores probably don't
guarantee READ_COMMITTED, so the default should be to return
READ_UNCOMMITTED.

Nits:
* N1) Could `getComittedOffset` use an `OptionalLong` return type,
to
avoid the `null`?
* N2) Could `apporixmateNumUncomittedBytes` use an `OptionalLong`
return type, to avoid the `-1`?
* N3) I don't understand why `managesOffsets` uses the 'manage'
verb,
whereas all other methods use the "commits" verb. I'd suggest
`commitsOffsets`.

Either way, it feels this KIP is very close to the finish line, I'm
looking forward to seeing this in production!

Cheers,
Lucas

On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <c...@littlehorse.io>
wrote:

Making IsolationLevel a query-time constraint, rather than linking
it
to
the processing.guarantee.

As I understand it, would this allow even a user of EOS to control
whether
reading committed or uncommitted records? If so, I am highly in
favor
of
this.

I know that I was one of the early people to point out the current
shortcoming that IQ reads uncommitted records, but just this
morning I
realized a pattern we use which means that (for certain queries)
our
system
needs to be able to read uncommitted records, which is the current
behavior
of Kafka Streams in EOS.***

If IsolationLevel being a query-time decision allows for this, then
that
would be amazing. I would also vote that the default behavior
should
be
for
reading uncommitted records, because it is totally possible for a
valid
application to depend on that behavior, and breaking it in a minor
release
might be a bit strong.

*** (Note, for the curious reader....) Our use-case/query pattern
is a
bit
complex, but reading "uncommitted" records is actually safe in our
case
because processing is deterministic. Additionally, IQ being able to
read
uncommitted records is crucial to enable "read your own writes" on
our
API:
Due to the deterministic processing, we send an "ack" to the client
who
makes the request as soon as the processor processes the result. If
they
can't read uncommitted records, they may receive a "201 - Created"
response, immediately followed by a "404 - Not Found" when doing a
lookup
for the object they just created).

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <
nick.telf...@gmail.com>
wrote:

Addendum:

I think we would also face the same problem with the approach John
outlined
earlier (using the record cache as a transaction buffer and
flushing
it
straight to SST files). This is because the record cache (the
ThreadCache
class) is not thread-safe, so every commit would invalidate open
IQ
Iterators in the same way that RocksDB WriteBatches do.
--
Nick

On Wed, 13 Sept 2023 at 16:58, Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Bruno,

I've updated the KIP based on our conversation. The only things
I've not
yet done are:

1. Using transactions under ALOS and EOS.
2. Making IsolationLevel a query-time constraint, rather than
linking it
to the processing.guarantee.

There's a wrinkle that makes this a challenge: Interactive
Queries
that
open an Iterator, when using transactions and READ_UNCOMMITTED.
The problem is that under READ_UNCOMMITTED, queries need to be
able
to
read records from the currently uncommitted transaction buffer
(WriteBatch). This includes for Iterators, which should iterate
both the
transaction buffer and underlying database (using
WriteBatch#iteratorWithBase()).

The issue is that when the StreamThread commits, it writes the
current
WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing
the
WriteBatch while an Interactive Query holds an open Iterator on
it
will
invalidate the Iterator. Worse, it turns out that Iterators over
a
WriteBatch become invalidated not just when the WriteBatch is
cleared,
but
also when the Iterators' current key receives a new write.

Now that I'm writing this, I remember that this is the major
reason
that
I
switched the original design from having a query-time
IsolationLevel to
having the IsolationLevel linked to the transactionality of the
stores
themselves.

It *might* be possible to resolve this, by having a "chain" of
WriteBatches, with the StreamThread switching to a new WriteBatch
whenever
a new Interactive Query attempts to read from the database, but
that
could
cause some performance problems/memory pressure when subjected to
a
high
Interactive Query load. It would also reduce the efficiency of
WriteBatches
on-commit, as we'd have to write N WriteBatches, where N is the
number of
Interactive Queries since the last commit.

I realise this is getting into the weeds of the implementation,
and
you'd
rather we focus on the API for now, but I think it's important to
consider
how to implement the desired API, in case we come up with an API
that
cannot be implemented efficiently, or even at all!

Thoughts?
--
Nick

On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <cado...@apache.org

wrote:

Hi Nick,

6.
Of course, you are right! My bad!
Wiping out the state in the downgrading case is fine.


3a.
Focus on the public facing changes for the KIP. We will manage
to
get
the internals right. Regarding state stores that do not support
READ_COMMITTED, they should throw an error stating that they do
not
support READ_COMMITTED. No need to adapt all state stores
immediately.

3b.
I am in favor of using transactions also for ALOS.


Best,
Bruno

On 9/13/23 11:57 AM, Nick Telford wrote:
Hi Bruno,

Thanks for getting back to me!

2.
The fact that implementations can always track estimated memory
usage
in
the wrapper is a good point. I can remove -1 as an option, and
I'll
clarify
the JavaDoc that 0 is not just for non-transactional stores,
which is
currently misleading.

6.
The problem with catching the exception in the downgrade
process
is
that
would require new code in the Kafka version being downgraded
to.
Since
users could conceivably downgrade to almost *any* older version
of
Kafka
Streams, I'm not sure how we could add that code?
The only way I can think of doing it would be to provide a
dedicated
downgrade tool, that goes through every local store and removes
the
offsets column families. But that seems like an unnecessary
amount of
extra
code to maintain just to handle a somewhat niche situation,
when
the
alternative (automatically wipe and restore stores) should be
acceptable.

1, 4, 5: Agreed. I'll make the changes you've requested.

3a.
I agree that IsolationLevel makes more sense at query-time, and
I
actually
initially attempted to place the IsolationLevel at query-time,
but I
ran
into some problems:
- The key issue is that, under ALOS we're not staging writes in
transactions, so can't perform writes at the READ_COMMITTED
isolation
level. However, this may be addressed if we decide to *always*
use
transactions as discussed under 3b.
- IQv1 and IQv2 have quite different implementations. I
remember
having
some difficulty understanding the IQv1 internals, which made it
difficult
to determine what needed to be changed. However, I *think* this
can be
addressed for both implementations by wrapping the RocksDBStore
in an
IsolationLevel-dependent wrapper, that overrides read methods
(get,
etc.)
to either read directly from the database or from the ongoing
transaction.
But IQv1 might still be difficult.
- If IsolationLevel becomes a query constraint, then all other
StateStores
will need to respect it, including the in-memory stores. This
would
require
us to adapt in-memory stores to stage their writes so they can
be
isolated
from READ_COMMITTTED queries. It would also become an important
consideration for third-party stores on upgrade, as without
changes,
they
would not support READ_COMMITTED queries correctly.

Ultimately, I may need some help making the necessary change to
IQv1
to
support this, but I don't think it's fundamentally impossible,
if we
want
to pursue this route.

3b.
The main reason I chose to keep ALOS un-transactional was to
minimize
behavioural change for most users (I believe most Streams users
use
the
default configuration, which is ALOS). That said, it's clear
that if
ALOS
also used transactional stores, the only change in behaviour
would be
that
it would become *more correct*, which could be considered a
"bug
fix"
by
users, rather than a change they need to handle.

I believe that performance using transactions (aka. RocksDB
WriteBatches)
should actually be *better* than the un-batched write-path that
is
currently used[1]. The only "performance" consideration will be
the
increased memory usage that transactions require. Given the
mitigations
for
this memory that we have in place, I would expect that this is
not a
problem for most users.

If we're happy to do so, we can make ALOS also use
transactions.

Regards,
Nick

Link 1:




https://github.com/adamretter/rocksjava-write-methods-benchmark#results

On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <
cado...@apache.org

wrote:

Hi Nick,

Thanks for the updates and sorry for the delay on my side!


1.
Making the default implementation for flush() a no-op sounds
good to
me.


2.
I think what was bugging me here is that a third-party state
store
needs
to implement the state store interface. That means they need
to
implement a wrapper around the actual state store as we do for
RocksDB
with RocksDBStore. So, a third-party state store can always
estimate
the
uncommitted bytes, if it wants, because the wrapper can record
the
added
bytes.
One case I can think of where returning -1 makes sense is when
Streams
does not need to estimate the size of the write batch and
trigger
extraordinary commits, because the third-party state store
takes care
of
memory. But in that case the method could also just return 0.
Even
that
case would be better solved with a method that returns whether
the
state
store manages itself the memory used for uncommitted bytes or
not.
Said that, I am fine with keeping the -1 return value, I was
just
wondering when and if it will be used.

Regarding returning 0 for transactional state stores when the
batch
is
empty, I was just wondering because you explicitly stated

"or {@code 0} if this StateStore does not support
transactions."

So it seemed to me returning 0 could only happen for
non-transactional
state stores.


3.

a) What do you think if we move the isolation level to IQ (v1
and
v2)?
In the end this is the only component that really needs to
specify
the
isolation level. It is similar to the Kafka consumer that can
choose
with what isolation level to read the input topic.
For IQv1 the isolation level should go into
StoreQueryParameters. For
IQv2, I would add it to the Query interface.

b) Point a) raises the question what should happen during
at-least-once
processing when the state store does not use transactions?
John
in
the
past proposed to also use transactions on state stores for
at-least-once. I like that idea, because it avoids aggregating
the
same
records over and over again in the case of a failure. We had a
case
in
the past where a Streams applications in at-least-once mode
was
failing
continuously for some reasons I do not remember before
committing the
offsets. After each failover, the app aggregated again and
again the
same records. Of course the aggregate increased to very wrong
values
just because of the failover. With transactions on the state
stores
we
could have avoided this. The app would have output the same
aggregate
multiple times (i.e., after each failover) but at least the
value of
the
aggregate would not depend on the number of failovers.
Outputting the
same aggregate multiple times would be incorrect under
exactly-once
but
it is OK for at-least-once.
If it makes sense to add a config to turn on and off
transactions on
state stores under at-least-once or just use transactions in
any case
is
a question we should also discuss in this KIP. It depends a
bit
on
the
performance trade-off. Maybe to be safe, I would add a config.


4.
Your points are all valid. I tend to say to keep the metrics
around
flush() until we remove flush() completely from the interface.
Calls
to
flush() might still exist since existing processors might
still
call
flush() explicitly as you mentioned in 1). For sure, we need
to
document
how the metrics change due to the transactions in the upgrade
notes.


5.
I see. Then you should describe how the .position files are
handled
in
a dedicated section of the KIP or incorporate the description
in the
"Atomic Checkpointing" section instead of only mentioning it
in
the
"Compatibility, Deprecation, and Migration Plan".


6.
Describing upgrading and downgrading in the KIP is a good
idea.
Regarding downgrading, I think you could also catch the
exception and
do
what is needed to downgrade, e.g., drop the column family. See
here
for
an example:








https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75

It is a bit brittle, but it works.


Best,
Bruno


On 8/24/23 12:18 PM, Nick Telford wrote:
Hi Bruno,

Thanks for taking the time to review the KIP. I'm back from
leave
now
and
intend to move this forwards as quickly as I can.

Addressing your points:

1.
Because flush() is part of the StateStore API, it's exposed
to
custom
Processors, which might be making calls to flush(). This was
actually
the
case in a few integration tests.
To maintain as much compatibility as possible, I'd prefer not
to
make
this
an UnsupportedOperationException, as it will cause previously
working
Processors to start throwing exceptions at runtime.
I agree that it doesn't make sense for it to proxy commit(),
though,
as
that would cause it to violate the "StateStores commit only
when the
Task
commits" rule.
Instead, I think we should make this a no-op. That way,
existing
user
Processors will continue to work as-before, without violation
of
store
consistency that would be caused by premature flush/commit of
StateStore
data to disk.
What do you think?

2.
As stated in the JavaDoc, when a StateStore implementation is
transactional, but is unable to estimate the uncommitted
memory
usage,
the
method will return -1.
The intention here is to permit third-party implementations
that may
not
be
able to estimate memory usage.

Yes, it will be 0 when nothing has been written to the store
yet. I
thought
that was implied by "This method will return an approximation
of the
memory
would be freed by the next call to {@link #commit(Map)}" and
"@return
The
approximate size of all records awaiting {@link
#commit(Map)}",
however,
I
can add it explicitly to the JavaDoc if you think this is
unclear?

3.
I realise this is probably the most contentious point in my
design,
and
I'm
open to changing it if I'm unable to convince you of the
benefits.
Nevertheless, here's my argument:
The Interactive Query (IQ) API(s) are directly provided
StateStores
to
query, and it may be important for users to programmatically
know
which
mode the StateStore is operating under. If we simply provide
an
"eosEnabled" boolean (as used throughout the internal streams
engine), or
similar, then users will need to understand the operation and
consequences
of each available processing mode and how it pertains to
their
StateStore.

Interactive Query users aren't the only people that care
about
the
processing.mode/IsolationLevel of a StateStore: implementers
of
custom
StateStores also need to understand the behaviour expected of
their
implementation. KIP-892 introduces some assumptions into the
Streams
Engine
about how StateStores operate under each processing mode, and
it's
important that custom implementations adhere to those
assumptions in
order
to maintain the consistency guarantees.

IsolationLevels provide a high-level contract on the
behaviour
of
the
StateStore: a user knows that under READ_COMMITTED, they will
see
writes
only after the Task has committed, and under READ_UNCOMMITTED
they
will
see
writes immediately. No understanding of the details of each
processing.mode
is required, either for IQ users or StateStore implementers.

An argument can be made that these contractual guarantees can
simply
be
documented for the processing.mode (i.e. that exactly-once
and
exactly-once-v2 behave like READ_COMMITTED and at-least-once
behaves
like
READ_UNCOMMITTED), but there are several small issues with
this I'd
prefer
to avoid:

        - Where would we document these contracts, in a way
that
is
difficult
        for users/implementers to miss/ignore?
        - It's not clear to users that the processing mode is
communicating
        an expectation of read isolation, unless they read the
documentation. Users
        rarely consult documentation unless they feel they
need
to, so
it's
likely
        this detail would get missed by many users.
        - It tightly couples processing modes to read
isolation.
Adding
new
        processing modes, or changing the read isolation of
existing
processing
        modes would be difficult/impossible.

Ultimately, the cost of introducing IsolationLevels is just a
single
method, since we re-use the existing IsolationLevel enum from
Kafka.
This
gives us a clear place to document the contractual guarantees
expected
of/provided by StateStores, that is accessible both by the
StateStore
itself, and by IQ users.

(Writing this I've just realised that the StateStore and IQ
APIs
actually
don't provide access to StateStoreContext that IQ users would
have
direct
access to... Perhaps StateStore should expose
isolationLevel()
itself
too?)

4.
Yeah, I'm not comfortable renaming the metrics in-place
either, as
it's a
backwards incompatible change. My concern is that, if we
leave
the
existing
"flush" metrics in place, they will be confusing to users.
Right
now,
"flush" metrics record explicit flushes to disk, but under
KIP-892,
even
a
commit() will not explicitly flush data to disk - RocksDB
will
decide
on
when to flush memtables to disk itself.

If we keep the existing "flush" metrics, we'd have two
options,
which
both
seem pretty bad to me:

        1. Have them record calls to commit(), which would be
misleading, as
        data is no longer explicitly "flushed" to disk by this
call.
        2. Have them record nothing at all, which is
equivalent
to
removing
the
        metrics, except that users will see the metric still
exists and
so
assume
        that the metric is correct, and that there's a problem
with
their
system
        when there isn't.

I agree that removing them is also a bad solution, and I'd
like some
guidance on the best path forward here.

5.
Position files are updated on every write to a StateStore.
Since our
writes
are now buffered until commit(), we can't update the Position
file
until
commit() has been called, otherwise it would be inconsistent
with
the
data
in the event of a rollback. Consequently, we need to manage
these
offsets
the same way we manage the checkpoint offsets, and ensure
they're
only
written on commit().

6.
Agreed, although I'm not exactly sure yet what tests to
write.
How
explicit
do we need to be here in the KIP?

As for upgrade/downgrade: upgrade is designed to be seamless,
and we
should
definitely add some tests around that. Downgrade, it
transpires,
isn't
currently possible, as the extra column family for offset
storage is
incompatible with the pre-KIP-892 implementation: when you
open a
RocksDB
database, you must open all available column families or
receive an
error.
What currently happens on downgrade is that it attempts to
open the
store,
throws an error about the offsets column family not being
opened,
which
triggers a wipe and rebuild of the Task. Given that
downgrades
should
be
uncommon, I think this is acceptable behaviour, as the
end-state is
consistent, even if it results in an undesirable state
restore.

Should I document the upgrade/downgrade behaviour explicitly
in the
KIP?

--

Regards,
Nick


On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Nick!

Thanks for the updates!

1.
Why does StateStore#flush() default to
StateStore#commit(Collections.emptyMap())?
Since calls to flush() will not exist anymore after this KIP
is
released, I would rather throw an unsupported operation
exception
by
default.


2.
When would a state store return -1 from
StateStore#approximateNumUncommittedBytes() while being
transactional?

Wouldn't StateStore#approximateNumUncommittedBytes() also
return 0
if
the state store is transactional but nothing has been
written
to
the
state store yet?


3.
Sorry for bringing this up again. Does this KIP really need
to
introduce
StateStoreContext#isolationLevel()? StateStoreContext has
already
appConfigs() which basically exposes the same information,
i.e., if
EOS
is enabled or not.
In one of your previous e-mails you wrote:

"My idea was to try to keep the StateStore interface as
loosely
coupled
from the Streams engine as possible, to give implementers
more
freedom,
and reduce the amount of internal knowledge required."

While I understand the intent, I doubt that it decreases the
coupling of
a StateStore interface and the Streams engine.
READ_COMMITTED
only
applies to IQ but not to reads by processors. Thus,
implementers
need to
understand how Streams accesses the state stores.

I would like to hear what others think about this.


4.
Great exposing new metrics for transactional state stores!
However, I
would prefer to add new metrics and deprecate (in the docs)
the old
ones. You can find examples of deprecated metrics here:
https://kafka.apache.org/documentation/#selector_monitoring


5.
Why does the KIP mention position files? I do not think they
are
related
to transactions or flushes.


6.
I think we will also need to adapt/add integration tests
besides
unit
tests. Additionally, we probably need integration or system
tests
to
verify that upgrades and downgrades between transactional
and
non-transactional state stores work as expected.


Best,
Bruno





On 7/21/23 10:34 PM, Nick Telford wrote:
One more thing: I noted John's suggestion in the KIP, under
"Rejected
Alternatives". I still think it's an idea worth pursuing,
but I
believe
that it's out of the scope of this KIP, because it solves a
different
set
of problems to this KIP, and the scope of this one has
already
grown
quite
large!

On Fri, 21 Jul 2023 at 21:33, Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi everyone,

I've updated the KIP (








https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
)
with the latest changes; mostly bringing back "Atomic
Checkpointing"
(for
what feels like the 10th time!). I think the one thing
missing is
some
changes to metrics (notably the store "flush" metrics will
need
to
be
renamed to "commit").

The reason I brought back Atomic Checkpointing was to
decouple
store
flush
from store commit. This is important, because with
Transactional
StateStores, we now need to call "flush" on *every* Task
commit,
and
not
just when the StateStore is closing, otherwise our
transaction
buffer
will
never be written and persisted, instead growing unbounded!
I
experimented
with some simple solutions, like forcing a store flush
whenever
the
transaction buffer was likely to exceed its configured
size, but
this
was
brittle: it prevented the transaction buffer from being
configured
to
be
unbounded, and it still would have required explicit
flushes of
RocksDB,
yielding sub-optimal performance and memory utilization.

I deemed Atomic Checkpointing to be the "right" way to
resolve
this
problem. By ensuring that the changelog offsets that
correspond
to
the
most
recently written records are always atomically written to
the
StateStore
(by writing them to the same transaction buffer), we can
avoid
forcibly
flushing the RocksDB memtables to disk, letting RocksDB
flush
them
only
when necessary, without losing any of our consistency
guarantees.
See
the
updated KIP for more info.

I have fully implemented these changes, although I'm still
not
entirely
happy with the implementation for segmented StateStores,
so
I
plan
to
refactor that. Despite that, all tests pass. If you'd like
to try
out
or
review this highly experimental and incomplete branch,
it's
available
here:
https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0.
Note:
it's
built
against Kafka 3.5.0 so that I had a stable base to build
and test
it
on,
and to enable easy apples-to-apples comparisons in a live
environment. I
plan to rebase it against trunk once it's nearer
completion
and
has
been
proven on our main application.

I would really appreciate help in reviewing and testing:
- Segmented (Versioned, Session and Window) stores
- Global stores

As I do not currently use either of these, so my primary
test
environment
doesn't test these areas.

I'm going on Parental Leave starting next week for a few
weeks,
so
will
not have time to move this forward until late August. That
said,
your
feedback is welcome and appreciated, I just won't be able
to
respond
as
quickly as usual.

Regards,
Nick

On Mon, 3 Jul 2023 at 16:23, Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Bruno

Yes, that's correct, although the impact on IQ is not
something
I
had
considered.

What about atomically updating the state store from the
transaction
buffer every commit interval and writing the checkpoint
(thus,
flushing
the memtable) every configured amount of data and/or
number of
commit
intervals?


I'm not quite sure I follow. Are you suggesting that we
add an
additional
config for the max number of commit intervals between
checkpoints?
That
way, we would checkpoint *either* when the transaction
buffers
are
nearly
full, *OR* whenever a certain number of commit intervals
have
elapsed,
whichever comes first?

That certainly seems reasonable, although this re-ignites
an
earlier
debate about whether a config should be measured in
"number of
commit
intervals", instead of just an absolute time.

FWIW, I realised that this issue is the reason I was
pursuing
the
Atomic
Checkpoints, as it de-couples memtable flush from
checkpointing,
which
enables us to just checkpoint on every commit without any
performance
impact. Atomic Checkpointing is definitely the "best"
solution,
but
I'm not
sure if this is enough to bring it back into this KIP.

I'm currently working on moving all the transactional
logic
directly
into
RocksDBStore itself, which does away with the
StateStore#newTransaction
method, and reduces the number of new classes introduced,
significantly
reducing the complexity. If it works, and the complexity
is
drastically
reduced, I may try bringing back Atomic Checkpoints into
this
KIP.

Regards,
Nick

On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Nick,

Thanks for the insights! Very interesting!

As far as I understand, you want to atomically update
the
state
store
from the transaction buffer, flush the memtable of a
state
store
and
write the checkpoint not after the commit time elapsed
but
after
the
transaction buffer reached a size that would lead to
exceeding
statestore.transaction.buffer.max.bytes before the next
commit
interval
ends.
That means, the Kafka transaction would commit every
commit
interval
but
the state store will only be atomically updated roughly
every
statestore.transaction.buffer.max.bytes of data. Also IQ
would
then
only
see new data roughly every
statestore.transaction.buffer.max.bytes.
After a failure the state store needs to restore up to
statestore.transaction.buffer.max.bytes.

Is this correct?

What about atomically updating the state store from the
transaction
buffer every commit interval and writing the checkpoint
(thus,
flushing
the memtable) every configured amount of data and/or
number of
commit
intervals? In such a way, we would have the same delay
for
records
appearing in output topics and IQ because both would
appear
when
the
Kafka transaction is committed. However, after a failure
the
state
store
still needs to restore up to
statestore.transaction.buffer.max.bytes
and
it might restore data that is already in the state store
because
the
checkpoint lags behind the last stable offset (i.e. the
last
committed
offset) of the changelog topics. Restoring data that is
already
in
the
state store is idempotent, so eos should not violated.
This solution needs at least one new config to specify
when a
checkpoint
should be written.



A small correction to your previous e-mail that does not
change
anything
you said: Under alos the default commit interval is 30
seconds,
not
five
seconds.


Best,
Bruno


On 01.07.23 12:37, Nick Telford wrote:
Hi everyone,

I've begun performance testing my branch on our staging
environment,
putting it through its paces in our non-trivial
application.
I'm
already
observing the same increased flush rate that we saw the
last
time
we
attempted to use a version of this KIP, but this time,
I
think I
know
why.

Pre-KIP-892, StreamTask#postCommit, which is called at
the end
of
the
Task
commit process, has the following behaviour:

          - Under ALOS: checkpoint the state stores.
This
includes
          flushing memtables in RocksDB. This is
acceptable
because the
default
          commit.interval.ms is 5 seconds, so forcibly
flushing
memtables
every 5
          seconds is acceptable for most applications.
          - Under EOS: checkpointing is not done,
*unless*
it's
being
forced, due
          to e.g. the Task closing or being revoked.
This
means
that
under
normal
          processing conditions, the state stores will
not
be
checkpointed,
and will
          not have memtables flushed at all , unless
RocksDB
decides to
flush them on
          its own. Checkpointing stores and
force-flushing
their
memtables
is only
          done when a Task is being closed.

Under EOS, KIP-892 needs to checkpoint stores on at
least
*some*
normal
Task commits, in order to write the RocksDB transaction
buffers
to
the
state stores, and to ensure the offsets are synced to
disk to
prevent
restores from getting out of hand. Consequently, my
current
implementation
calls maybeCheckpoint on *every* Task commit, which is
far too
frequent.
This causes checkpoints every 10,000 records, which is
a
change
in
flush
behaviour, potentially causing performance problems for
some
applications.

I'm looking into possible solutions, and I'm currently
leaning
towards
using the statestore.transaction.buffer.max.bytes
configuration
to
checkpoint Tasks once we are likely to exceed it. This
would
complement the
existing "early Task commit" functionality that this
configuration
provides, in the following way:

          - Currently, we use
statestore.transaction.buffer.max.bytes
to
force an
          early Task commit if processing more records
would
cause
our
state
store
          transactions to exceed the memory assigned to
them.
          - New functionality: when a Task *does*
commit,
we will
not
checkpoint
          the stores (and hence flush the transaction
buffers)
unless
we
expect to
          cross the
statestore.transaction.buffer.max.bytes
threshold
before
the next
          commit

I'm also open to suggestions.

Regards,
Nick

On Thu, 22 Jun 2023 at 14:06, Nick Telford <
nick.telf...@gmail.com

wrote:

Hi Bruno!

3.
By "less predictable for users", I meant in terms of
understanding
the
performance profile under various circumstances. The
more
complex
the
solution, the more difficult it would be for users to
understand
the
performance they see. For example, spilling records to
disk
when
the
transaction buffer reaches a threshold would, I
expect,
reduce
write
throughput. This reduction in write throughput could
be
unexpected,
and
potentially difficult to diagnose/understand for
users.
At the moment, I think the "early commit" concept is
relatively
straightforward; it's easy to document, and
conceptually
fairly
obvious to
users. We could probably add a metric to make it
easier
to
understand
when
it happens though.

3. (the second one)
The IsolationLevel is *essentially* an indirect way of
telling
StateStores
whether they should be transactional. READ_COMMITTED
essentially
requires
transactions, because it dictates that two threads
calling
`newTransaction()` should not see writes from the
other
transaction
until
they have been committed. With READ_UNCOMMITTED, all
bets are
off,
and
stores can allow threads to observe written records at
any
time,
which is
essentially "no transactions". That said, StateStores
are
free
to
implement
these guarantees however they can, which is a bit more
relaxed
than
dictating "you must use transactions". For example,
with
RocksDB
we
would
implement these as READ_COMMITTED == WBWI-based
"transactions",
READ_UNCOMMITTED == direct writes to the database. But
with
other
storage
engines, it might be preferable to *always* use
transactions,
even
when
unnecessary; or there may be storage engines that
don't
provide
transactions, but the isolation guarantees can be met
using a
different
technique.
My idea was to try to keep the StateStore interface as
loosely
coupled
from the Streams engine as possible, to give
implementers
more
freedom, and
reduce the amount of internal knowledge required.
That said, I understand that "IsolationLevel" might
not
be
the
right
abstraction, and we can always make it much more
explicit if
required, e.g.
boolean transactional()

7-8.
I can make these changes either later today or
tomorrow.

Small update:
I've rebased my branch on trunk and fixed a bunch of
issues
that
needed
addressing. Currently, all the tests pass, which is
promising,
but
it
will
need to undergo some performance testing. I haven't
(yet)
worked
on
removing the `newTransaction()` stuff, but I would
expect
that,
behaviourally, it should make no difference. The
branch
is
available
at
https://github.com/nicktelford/kafka/tree/KIP-892-c
if
anyone
is
interested in taking an early look.

Regards,
Nick

On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Nick,

1.
Yeah, I agree with you. That was actually also my
point. I
understood
that John was proposing the ingestion path as a way
to
avoid
the
early
commits. Probably, I misinterpreted the intent.

2.
I agree with John here, that actually it is public
API. My
question
is
how this usage pattern affects normal processing.

3.
My concern is that checking for the size of the
transaction
buffer
and
maybe triggering an early commit affects the whole
processing
of
Kafka
Streams. The transactionality of a state store is not
confined to
the
state store itself, but spills over and changes the
behavior
of
other
parts of the system. I agree with you that it is a
decent
compromise. I
just wanted to analyse the downsides and list the
options to
overcome
them. I also agree with you that all options seem
quite
heavy
compared
with your KIP. I do not understand what you mean with
"less
predictable
for users", though.


I found the discussions about the alternatives really
interesting.
But I
also think that your plan sounds good and we should
continue
with
it!


Some comments on your reply to my e-mail on June
20th:

3.
Ah, now, I understand the reasoning behind putting
isolation
level
in
the state store context. Thanks! Should that also be
a
way
to
give
the
the state store the opportunity to decide whether to
turn on
transactions or not?
With my comment, I was more concerned about how do
you
know
if a
checkpoint file needs to be written under EOS, if you
do not
have a
way
to know if the state store is transactional or not.
If
a
state
store
is
transactional, the checkpoint file can be written
during
normal
processing under EOS. If the state store is not
transactional,
the
checkpoint file must not be written under EOS.

7.
My point was about not only considering the bytes in
memory
in
config
statestore.uncommitted.max.bytes, but also bytes that
might
be
spilled
on disk. Basically, I was wondering whether you
should
remove
the
"memory" in "Maximum number of memory bytes to be
used
to
buffer uncommitted state-store records." My thinking
was
that
even
if a
state store spills uncommitted bytes to disk,
limiting
the
overall
bytes
might make sense. Thinking about it again and
considering
the
recent
discussions, it does not make too much sense anymore.
I like the name
statestore.transaction.buffer.max.bytes that
you
proposed.

8.
A high-level description (without implementation
details) of
how
Kafka
Streams will manage the commit of changelog
transactions,
state
store
transactions and checkpointing would be great. Would
be
great
if
you
could also add some sentences about the behavior in
case of
a
failure.
For instance how does a transactional state store
recover
after a
failure or what happens with the transaction buffer,
etc.
(that
is
what
I meant by "fail-over" in point 9.)

Best,
Bruno

On 21.06.23 18:50, Nick Telford wrote:
Hi Bruno,

1.
Isn't this exactly the same issue that
WriteBatchWithIndex
transactions
have, whereby exceeding (or likely to exceed)
configured
memory
needs to
trigger an early commit?

2.
This is one of my big concerns. Ultimately, any
approach
based
on
cracking
open RocksDB internals and using it in ways it's not
really
designed
for is
likely to have some unforseen performance or
consistency
issues.

3.
What's your motivation for removing these early
commits?
While
not
ideal, I
think they're a decent compromise to ensure
consistency
whilst
maintaining
good and predictable performance.
All 3 of your suggested ideas seem *very*
complicated, and
might
actually
make behaviour less predictable for users as a
consequence.

I'm a bit concerned that the scope of this KIP is
growing a
bit
out
of
control. While it's good to discuss ideas for future
improvements, I
think
it's important to narrow the scope down to a design
that
achieves
the
most
pressing objectives (constant sized restorations
during
dirty
close/unexpected errors). Any design that this KIP
produces
can
ultimately
be changed in the future, especially if the bulk of
it is
internal
behaviour.

I'm going to spend some time next week trying to
re-work
the
original
WriteBatchWithIndex design to remove the
newTransaction()
method,
such
that
it's just an implementation detail of RocksDBStore.
That
way, if
we
want to
replace WBWI with something in the future, like the
SST
file
management
outlined by John, then we can do so with little/no
API
changes.

Regards,

Nick






















Reply via email to